From 5fc0eafab9ea2a4ece7b87218404489c270b64e6 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 19 Jun 2024 22:29:17 +0800 Subject: [PATCH] [improve] [broker] PIP-356 Support Geo-Replication starts at earliest position (#22856) --- .../pulsar/broker/ServiceConfiguration.java | 6 ++ .../service/persistent/PersistentTopic.java | 9 +- .../broker/service/OneWayReplicatorTest.java | 99 +++++++++++++++++++ .../OneWayReplicatorUsingGlobalZKTest.java | 52 ++++++++++ 4 files changed, 165 insertions(+), 1 deletion(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 63ef6f3efe6d0..73bf2316b8287 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1345,6 +1345,12 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece doc = "Max number of snapshot to be cached per subscription.") private int replicatedSubscriptionsSnapshotMaxCachedPerSubscription = 10; + @FieldContext( + category = CATEGORY_SERVER, + dynamic = true, + doc = "The position that replication task start at, it can be set to earliest or latest (default).") + private String replicationStartAt = "latest"; + @FieldContext( category = CATEGORY_SERVER, dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 630712f536874..f3121ec152642 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2041,7 +2041,14 @@ CompletableFuture startReplicator(String remoteCluster) { final CompletableFuture future = new CompletableFuture<>(); String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster); - ledger.asyncOpenCursor(name, new OpenCursorCallback() { + final InitialPosition initialPosition; + if (MessageId.earliest.toString() + .equalsIgnoreCase(getBrokerService().getPulsar().getConfiguration().getReplicationStartAt())) { + initialPosition = InitialPosition.Earliest; + } else { + initialPosition = InitialPosition.Latest; + } + ledger.asyncOpenCursor(name, initialPosition, new OpenCursorCallback() { @Override public void openCursorComplete(ManagedCursor cursor, Object ctx) { String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index c9b23c6437a22..e686cd2c94f62 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -25,6 +25,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -36,6 +37,7 @@ import java.lang.reflect.Method; import java.time.Duration; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.Optional; import java.util.UUID; @@ -71,6 +73,7 @@ import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; +import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.common.naming.TopicName; @@ -912,4 +915,100 @@ public void testReloadWithTopicLevelGeoReplication(ReplicationLevel replicationL }); } } + + protected void enableReplication(String topic) throws Exception { + admin1.topics().setReplicationClusters(topic, Arrays.asList(cluster1, cluster2)); + } + + protected void disableReplication(String topic) throws Exception { + admin1.topics().setReplicationClusters(topic, Arrays.asList(cluster1, cluster2)); + } + + @Test + public void testConfigReplicationStartAt() throws Exception { + // Initialize. + String ns1 = defaultTenant + "/ns_" + UUID.randomUUID().toString().replace("-", ""); + String subscription1 = "s1"; + admin1.namespaces().createNamespace(ns1); + if (!usingGlobalZK) { + admin2.namespaces().createNamespace(ns1); + } + + RetentionPolicies retentionPolicies = new RetentionPolicies(60 * 24, 1024); + admin1.namespaces().setRetention(ns1, retentionPolicies); + admin2.namespaces().setRetention(ns1, retentionPolicies); + + // 1. default config. + // Enable replication for topic1. + final String topic1 = BrokerTestUtil.newUniqueName("persistent://" + ns1 + "/tp_"); + admin1.topics().createNonPartitionedTopicAsync(topic1); + admin1.topics().createSubscription(topic1, subscription1, MessageId.earliest); + Producer p1 = client1.newProducer(Schema.STRING).topic(topic1).create(); + p1.send("msg-1"); + p1.close(); + enableReplication(topic1); + // Verify: since the replication was started at latest, there is no message to consume. + Consumer c1 = client2.newConsumer(Schema.STRING).topic(topic1).subscriptionName(subscription1) + .subscribe(); + Message msg1 = c1.receive(2, TimeUnit.SECONDS); + assertNull(msg1); + c1.close(); + disableReplication(topic1); + + // 2.Update config: start at "earliest". + admin1.brokers().updateDynamicConfiguration("replicationStartAt", MessageId.earliest.toString()); + Awaitility.await().untilAsserted(() -> { + pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("earliest"); + }); + + final String topic2 = BrokerTestUtil.newUniqueName("persistent://" + ns1 + "/tp_"); + admin1.topics().createNonPartitionedTopicAsync(topic2); + admin1.topics().createSubscription(topic2, subscription1, MessageId.earliest); + Producer p2 = client1.newProducer(Schema.STRING).topic(topic2).create(); + p2.send("msg-1"); + p2.close(); + enableReplication(topic2); + // Verify: since the replication was started at earliest, there is one message to consume. + Consumer c2 = client2.newConsumer(Schema.STRING).topic(topic2).subscriptionName(subscription1) + .subscribe(); + Message msg2 = c2.receive(2, TimeUnit.SECONDS); + assertNotNull(msg2); + assertEquals(msg2.getValue(), "msg-1"); + c2.close(); + disableReplication(topic2); + + // 2.Update config: start at "latest". + admin1.brokers().updateDynamicConfiguration("replicationStartAt", MessageId.latest.toString()); + Awaitility.await().untilAsserted(() -> { + pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("latest"); + }); + + final String topic3 = BrokerTestUtil.newUniqueName("persistent://" + ns1 + "/tp_"); + admin1.topics().createNonPartitionedTopicAsync(topic3); + admin1.topics().createSubscription(topic3, subscription1, MessageId.earliest); + Producer p3 = client1.newProducer(Schema.STRING).topic(topic3).create(); + p3.send("msg-1"); + p3.close(); + enableReplication(topic3); + // Verify: since the replication was started at latest, there is no message to consume. + Consumer c3 = client2.newConsumer(Schema.STRING).topic(topic3).subscriptionName(subscription1) + .subscribe(); + Message msg3 = c3.receive(2, TimeUnit.SECONDS); + assertNull(msg3); + c3.close(); + disableReplication(topic3); + + // cleanup. + // There is no good way to delete topics when using global ZK, skip cleanup. + admin1.namespaces().setNamespaceReplicationClusters(ns1, Collections.singleton(cluster1)); + admin1.namespaces().unload(ns1); + admin2.namespaces().setNamespaceReplicationClusters(ns1, Collections.singleton(cluster2)); + admin2.namespaces().unload(ns1); + admin1.topics().delete(topic1, false); + admin2.topics().delete(topic1, false); + admin1.topics().delete(topic2, false); + admin2.topics().delete(topic2, false); + admin1.topics().delete(topic3, false); + admin2.topics().delete(topic3, false); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java index b8f8edce2477e..31e94f435f0f6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java @@ -18,7 +18,19 @@ */ package org.apache.pulsar.broker.service; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import java.util.Arrays; +import java.util.HashSet; +import java.util.UUID; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -109,4 +121,44 @@ public void testExpandTopicPartitionsOnNamespaceLevelReplication() throws Except public void testReloadWithTopicLevelGeoReplication(ReplicationLevel replicationLevel) throws Exception { super.testReloadWithTopicLevelGeoReplication(replicationLevel); } + + @Test + @Override + public void testConfigReplicationStartAt() throws Exception { + // Initialize. + String ns1 = defaultTenant + "/ns_" + UUID.randomUUID().toString().replace("-", ""); + String subscription1 = "s1"; + admin1.namespaces().createNamespace(ns1); + RetentionPolicies retentionPolicies = new RetentionPolicies(60 * 24, 1024); + admin1.namespaces().setRetention(ns1, retentionPolicies); + admin2.namespaces().setRetention(ns1, retentionPolicies); + + // Update config: start at "earliest". + admin1.brokers().updateDynamicConfiguration("replicationStartAt", MessageId.earliest.toString()); + Awaitility.await().untilAsserted(() -> { + pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("earliest"); + }); + + // Verify: since the replication was started at earliest, there is one message to consume. + final String topic1 = BrokerTestUtil.newUniqueName("persistent://" + ns1 + "/tp_"); + admin1.topics().createNonPartitionedTopicAsync(topic1); + admin1.topics().createSubscription(topic1, subscription1, MessageId.earliest); + org.apache.pulsar.client.api.Producer p1 = client1.newProducer(Schema.STRING).topic(topic1).create(); + p1.send("msg-1"); + p1.close(); + + admin1.namespaces().setNamespaceReplicationClusters(ns1, new HashSet<>(Arrays.asList(cluster1, cluster2))); + org.apache.pulsar.client.api.Consumer c1 = client2.newConsumer(Schema.STRING).topic(topic1) + .subscriptionName(subscription1).subscribe(); + Message msg2 = c1.receive(2, TimeUnit.SECONDS); + assertNotNull(msg2); + assertEquals(msg2.getValue(), "msg-1"); + c1.close(); + + // cleanup. + admin1.brokers().updateDynamicConfiguration("replicationStartAt", MessageId.latest.toString()); + Awaitility.await().untilAsserted(() -> { + pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("latest"); + }); + } }