diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java index 4fb0022194a02..b21fe7acfdb6f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service.persistent; import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; +import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; import io.prometheus.client.Counter; import io.prometheus.client.Gauge; @@ -220,6 +221,23 @@ private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) { private void startNewSnapshot() { cleanupTimedOutSnapshots(); + if (lastCompletedSnapshotStartTime == 0 && !pendingSnapshots.isEmpty()) { + // 1. If the remote cluster has disabled subscription replication or there's an incorrect config, + // it will not respond to SNAPSHOT_REQUEST. Therefore, lastCompletedSnapshotStartTime will remain 0, + // making it unnecessary to resend the request. + // 2. This approach prevents sending additional SNAPSHOT_REQUEST to both local_topic and remote_topic. + // 3. Since it's uncertain when the remote cluster will enable subscription replication, + // the timeout mechanism of pendingSnapshots is used to ensure retries. + // + // In other words, when hit this case, The frequency of sending SNAPSHOT_REQUEST + // will use `replicatedSubscriptionsSnapshotTimeoutSeconds`. + if (log.isDebugEnabled()) { + log.debug("[{}] PendingSnapshot exists but has never succeeded. " + + "Skipping snapshot creation until pending snapshot timeout.", topic.getName()); + } + return; + } + if (topic.getLastMaxReadPositionMovedForwardTimestamp() < lastCompletedSnapshotStartTime || topic.getLastMaxReadPositionMovedForwardTimestamp() == 0) { // There was no message written since the last snapshot, we can skip creating a new snapshot @@ -324,6 +342,11 @@ String localCluster() { return localCluster; } + @VisibleForTesting + public ConcurrentMap pendingSnapshots() { + return pendingSnapshots; + } + @Override public boolean isMarkerMessage() { // Everything published by this controller will be a marker a message diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java index 5b896a22baa33..0f527993bba59 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java @@ -64,6 +64,10 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.TransactionIsolationLevel; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.common.api.proto.MarkerType; +import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.TenantInfoImpl; @@ -1002,6 +1006,92 @@ public void testReplicatedSubscriptionWithCompaction() throws Exception { Assert.assertEquals(result, List.of("V2")); } + @Test + public void testReplicatedSubscriptionOneWay() throws Exception { + final String namespace = BrokerTestUtil.newUniqueName("pulsar-r4/replicatedsubscription"); + final String topicName = "persistent://" + namespace + "/one-way"; + int defaultSubscriptionsSnapshotFrequency = config1.getReplicatedSubscriptionsSnapshotFrequencyMillis(); + int defaultSubscriptionsSnapshotTimeout = config1.getReplicatedSubscriptionsSnapshotTimeoutSeconds(); + config1.setReplicatedSubscriptionsSnapshotTimeoutSeconds(2); + config1.setReplicatedSubscriptionsSnapshotFrequencyMillis(100); + + // cluster4 disabled ReplicatedSubscriptions + admin1.tenants().createTenant("pulsar-r4", + new TenantInfoImpl(Sets.newHashSet("appid1", "appid4"), Sets.newHashSet(cluster1, cluster4))); + admin1.namespaces().createNamespace(namespace); + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster1, cluster4)); + + String subscriptionName = "cluster-subscription"; + boolean replicateSubscriptionState = true; + + @Cleanup + PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()) + .statsInterval(0, TimeUnit.SECONDS) + .build(); + + @Cleanup + final PulsarClient client4 = PulsarClient.builder().serviceUrl(url4.toString()) + .statsInterval(0, TimeUnit.SECONDS) + .build(); + + // create subscription in cluster4 + createReplicatedSubscription(client1, topicName, subscriptionName, replicateSubscriptionState); + // create subscription in cluster4 + createReplicatedSubscription(client4, topicName, subscriptionName, replicateSubscriptionState); + + // send messages in cluster1 + @Cleanup + Producer producer = client1.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + int numMessages = 6; + for (int i = 0; i < numMessages; i++) { + String body = "message" + i; + producer.send(body.getBytes(StandardCharsets.UTF_8)); + } + producer.close(); + + // wait for snapshot marker request to be replicated + Thread.sleep(3 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis()); + + // Assert just have 1 pending snapshot in cluster1 + final PersistentTopic topic1 = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + ReplicatedSubscriptionsController r1Controller = + topic1.getReplicatedSubscriptionController().get(); + assertEquals(r1Controller.pendingSnapshots().size(), 1); + + // Assert cluster4 just receive 1 snapshot request msg + int numSnapshotRequest = 0; + List> r4Messages = admin4.topics() + .peekMessages(topicName, subscriptionName, 100, true, TransactionIsolationLevel.READ_UNCOMMITTED); + for (Message r4Message : r4Messages) { + MessageMetadata msgMetadata = ((MessageImpl) r4Message).getMessageBuilder(); + if (msgMetadata.hasMarkerType() && msgMetadata.getMarkerType() == MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST_VALUE) { + numSnapshotRequest++; + } + } + Assert.assertEquals(numSnapshotRequest, 1); + + // Wait pending snapshot timeout + Thread.sleep(config1.getReplicatedSubscriptionsSnapshotTimeoutSeconds() * 1000); + numSnapshotRequest = 0; + r4Messages = admin4.topics() + .peekMessages(topicName, subscriptionName, 100, true, TransactionIsolationLevel.READ_UNCOMMITTED); + for (Message r4Message : r4Messages) { + MessageMetadata msgMetadata = ((MessageImpl) r4Message).getMessageBuilder(); + if (msgMetadata.hasMarkerType() && msgMetadata.getMarkerType() == MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST_VALUE) { + numSnapshotRequest++; + } + } + Assert.assertEquals(numSnapshotRequest, 2); + + // Set back to default config. + config1.setReplicatedSubscriptionsSnapshotTimeoutSeconds(defaultSubscriptionsSnapshotTimeout); + config1.setReplicatedSubscriptionsSnapshotFrequencyMillis(defaultSubscriptionsSnapshotFrequency); + } + /** * Disable replication subscription. * Test scheduled task case.