diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index ca4c685b2806a..53d1b147079a9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -88,6 +88,7 @@ import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.EntryFilters; @@ -961,7 +962,8 @@ protected void internalSetBookieAffinityGroup(BookieAffinityGroupData bookieAffi policies -> new LocalPolicies(policies.bundles, bookieAffinityGroup, policies.namespaceAntiAffinityGroup, - policies.migrated)) + policies.migrated, + policies.migratedClusterUrl)) .orElseGet(() -> new LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()), bookieAffinityGroup, null)); @@ -1781,7 +1783,8 @@ protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) { lp.map(policies -> new LocalPolicies(policies.bundles, policies.bookieAffinityGroup, antiAffinityGroup, - policies.migrated)) + policies.migrated, + policies.migratedClusterUrl)) .orElseGet(() -> new LocalPolicies(defaultBundle(), null, antiAffinityGroup)) ); @@ -1819,7 +1822,8 @@ protected void internalRemoveNamespaceAntiAffinityGroup() { new LocalPolicies(policies.bundles, policies.bookieAffinityGroup, null, - policies.migrated)); + policies.migrated, + policies.migratedClusterUrl)); log.info("[{}] Successfully removed anti-affinity group for a namespace={}", clientAppId(), namespaceName); } catch (Exception e) { log.error("[{}] Failed to remove anti-affinity group for namespace {}", clientAppId(), namespaceName, e); @@ -2772,9 +2776,10 @@ protected void internalEnableMigration(boolean migrated) { policies -> new LocalPolicies(policies.bundles, policies.bookieAffinityGroup, policies.namespaceAntiAffinityGroup, - migrated)) + migrated, + policies.migratedClusterUrl)) .orElseGet(() -> new LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()), - null, null, migrated))); + null, null, migrated, null))); log.info("Successfully updated migration on namespace {}", namespaceName); } catch (Exception e) { log.error("Failed to update migration on namespace {}", namespaceName, e); @@ -2782,6 +2787,26 @@ protected void internalEnableMigration(boolean migrated) { } } + protected void internalUpdateMigrationState(boolean migrated, ClusterUrl clusterUrl) { + validateSuperUserAccess(); + try { + getLocalPolicies().setLocalPoliciesWithCreate(namespaceName, oldPolicies -> oldPolicies.map( + policies -> new LocalPolicies(policies.bundles, + policies.bookieAffinityGroup, + policies.namespaceAntiAffinityGroup, + migrated, + clusterUrl)) + .orElseGet(() -> new LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()), + null, null, migrated, clusterUrl))); + log.info("Successfully updated migration state on namespace {}, migrated={}, clusterUrl={}", + namespaceName, migrated, clusterUrl); + } catch (Exception e) { + log.error("Failed to update migration state on namespace {}, migrated={}, clusterUrl={}", + namespaceName, migrated, clusterUrl, e); + throw new RestException(e); + } + } + protected Policies getDefaultPolicesIfNull(Policies policies) { if (policies == null) { policies = new Policies(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 9cf394e77f4f2..5fafffc36e681 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -57,6 +57,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; import org.apache.pulsar.common.policies.data.BundlesData; +import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl; import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; @@ -1722,6 +1723,22 @@ public void enableMigration(@PathParam("property") String property, internalEnableMigration(migrated); } + @POST + @Path("/{property}/{cluster}/{namespace}/migrationState") + @ApiOperation(hidden = true, value = "Update migration state for a namespace") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Operation successful"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") }) + public void updateMigrationState(@PathParam("property") String property, + @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace, + @QueryParam("migrated") boolean migrated, + ClusterUrl clusterUrl) { + validateNamespaceName(property, cluster, namespace); + internalUpdateMigrationState(migrated, clusterUrl); + } + @PUT @Path("/{property}/{cluster}/{namespace}/policy") @ApiOperation(value = "Creates a new namespace with the specified policies") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 36150ee21b32c..c042e096a8b57 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -61,6 +61,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; +import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.EntryFilters; @@ -3019,6 +3020,21 @@ public void enableMigration(@PathParam("tenant") String tenant, internalEnableMigration(migrated); } + @POST + @Path("/{tenant}/{namespace}/migrationState") + @ApiOperation(hidden = true, value = "Update migration state for a namespace") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Operation successful"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") }) + public void updateMigrationState(@PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @QueryParam("migrated") boolean migrated, + ClusterUrl clusterUrl) { + validateNamespaceName(tenant, namespace); + internalUpdateMigrationState(migrated, clusterUrl); + } + @POST @Path("/{tenant}/{namespace}/dispatcherPauseOnAckStatePersistent") @ApiOperation(value = "Set dispatcher pause on ack state persistent configuration for specified namespace.") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 69a38bc50de9d..68da400a39c11 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -1363,20 +1363,28 @@ public static CompletableFuture> getMigratedClusterUrlAsync CompletableFuture> result = new CompletableFuture<>(); pulsar.getPulsarResources().getClusterResources().getClusterPoliciesResources() .getClusterPoliciesAsync(pulsar.getConfig().getClusterName()) - .thenCombine(isNamespaceMigrationEnabledAsync(pulsar, topic), - ((clusterData, isNamespaceMigrationEnabled) -> { - Optional url = (clusterData.isPresent() && (clusterData.get().isMigrated() - || isNamespaceMigrationEnabled)) - ? Optional.ofNullable(clusterData.get().getMigratedClusterUrl()) - : Optional.empty(); - return url; + .thenCombine(pulsar.getPulsarResources().getLocalPolicies() + .getLocalPoliciesAsync(TopicName.get(topic).getNamespaceObject()), + ((clusterData, nsLocalPolicies) -> { + boolean isNamespaceMigrated = false; + if (nsLocalPolicies.isPresent() && nsLocalPolicies.get().migrated) { + isNamespaceMigrated = true; + ClusterUrl clusterUrl = nsLocalPolicies.get().migratedClusterUrl; + if (clusterUrl != null && !clusterUrl.isEmpty()) { + return Optional.of(clusterUrl); + } + } + if (clusterData.isPresent() && (clusterData.get().isMigrated() || isNamespaceMigrated)) { + return Optional.ofNullable(clusterData.get().getMigratedClusterUrl()); + } + return Optional.empty(); })) .thenAccept(res -> { // cluster policies future is completed by metadata-store thread and continuing further // processing in the same metadata store can cause deadlock while creating topic as // create topic path may have blocking call on metadata-store. so, complete future on a // separate thread to avoid deadlock. - pulsar.getExecutor().execute(() -> result.complete(res)); + pulsar.getExecutor().execute(() -> result.complete((Optional) res)); }).exceptionally(ex -> { pulsar.getExecutor().execute(() -> result.completeExceptionally(ex.getCause())); return null; @@ -1384,12 +1392,6 @@ public static CompletableFuture> getMigratedClusterUrlAsync return result; } - private static CompletableFuture isNamespaceMigrationEnabledAsync(PulsarService pulsar, String topic) { - return pulsar.getPulsarResources().getLocalPolicies() - .getLocalPoliciesAsync(TopicName.get(topic).getNamespaceObject()) - .thenApply(policies -> policies.isPresent() && policies.get().migrated); - } - public static Optional getMigratedClusterUrl(PulsarService pulsar, String topic) { try { return getMigratedClusterUrlAsync(pulsar, topic) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java index 3ee365cdd4571..e6240b5ef8846 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java @@ -198,6 +198,7 @@ public LocalPolicies toLocalPolicies() { return new LocalPolicies(this.getBundlesData(), localPolicies.map(lp -> lp.getLeft().bookieAffinityGroup).orElse(null), localPolicies.map(lp -> lp.getLeft().namespaceAntiAffinityGroup).orElse(null), - localPolicies.map(lp -> lp.getLeft().migrated).orElse(false)); + localPolicies.map(lp -> lp.getLeft().migrated).orElse(false), + localPolicies.map(lp -> lp.getLeft().migratedClusterUrl).orElse(null)); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java index 167c154c1fd88..de04baccaf166 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java @@ -1192,6 +1192,111 @@ public void testNamespaceMigrationWithReplicationBacklog(SubscriptionType subTyp client3.close(); } + @Test + public void testNamespaceMigrationWithNamespaceLevelMigratedClusterUrl() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/migrationTopic"); + + // cluster1 producer/consumer + @Cleanup + PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + Producer producer1 = client1.newProducer().topic(topicName).enableBatching(false) + .producerName("producer1").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + Consumer consumer1 = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared) + .subscriptionName("s1").subscribe(); + AbstractTopic topic1 = (AbstractTopic) pulsar1.getBrokerService().getTopic(topicName, false).getNow(null).get(); + retryStrategically((test) -> !topic1.getProducers().isEmpty(), 5, 500); + retryStrategically((test) -> !topic1.getSubscriptions().isEmpty(), 5, 500); + assertFalse(topic1.getProducers().isEmpty()); + assertFalse(topic1.getSubscriptions().isEmpty()); + + // build backlog + consumer1.close(); + int n = 5; + for (int i = 0; i < n; i++) { + producer1.send("test1".getBytes()); + } + + // cluster2 producer/consumer + @Cleanup + PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + Producer producer2 = client2.newProducer().topic(topicName).enableBatching(false) + .producerName("producer2").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + AbstractTopic topic2 = (AbstractTopic) pulsar2.getBrokerService().getTopic(topicName, false).getNow(null).get(); + assertFalse(topic2.getProducers().isEmpty()); + assertTrue(topic2.getSubscriptions().isEmpty()); + + // cluster3 producer/consumer + @Cleanup + PulsarClient client3 = PulsarClient.builder().serviceUrl(url3.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + Producer producer3 = client3.newProducer().topic(topicName).enableBatching(false) + .producerName("producer3").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + AbstractTopic topic3 = (AbstractTopic) pulsar3.getBrokerService().getTopic(topicName, false).getNow(null).get(); + assertFalse(topic3.getProducers().isEmpty()); + assertTrue(topic3.getSubscriptions().isEmpty()); + + // Set the migratedCluster at both the cluster level and the namespace level. Since the configuration at + // the namespace level has a higher priority, the topic will be migrated to cluster3. + ClusterUrl cluster2MigratedUrl = new ClusterUrl(pulsar2.getWebServiceAddress(), + pulsar2.getWebServiceAddressTls(), pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls()); + ClusterUrl cluster3MigratedUrl = new ClusterUrl(pulsar3.getWebServiceAddress(), + pulsar3.getWebServiceAddressTls(), pulsar3.getBrokerServiceUrl(), pulsar3.getBrokerServiceUrlTls()); + admin1.clusters().updateClusterMigration("r1", true, cluster2MigratedUrl); + admin1.namespaces().updateMigrationState(namespace, true, cluster3MigratedUrl); + + retryStrategically((test) -> { + try { + topic1.checkClusterMigration().get(); + return true; + } catch (Exception e) { + // ok + } + return false; + }, 10, 500); + topic1.checkClusterMigration().get(); + + sleep(1000); + producer1.sendAsync("test1".getBytes()); + + // producer1 is disconnected from cluster1 + retryStrategically((test) -> topic1.getProducers().isEmpty(), 10, 500); + assertTrue(topic1.getProducers().isEmpty()); + // producer1 is not connected to cluster2 + retryStrategically((test) -> topic2.getProducers().size() == 1, 10, 500); + assertEquals(topic2.getProducers().size(), 1); + // producer1 is connected to cluster3 + retryStrategically((test) -> topic3.getProducers().size() == 2, 10, 500); + assertEquals(topic3.getProducers().size(), 2); + + // try to consume backlog messages from cluster1 + consumer1 = client1.newConsumer().topic(topicName).subscriptionName("s1").subscribe(); + for (int i = 0; i < n; i++) { + Message msg = consumer1.receive(); + assertEquals(msg.getData(), "test1".getBytes()); + consumer1.acknowledge(msg); + } + // after consuming all messages, consumer1 should have disconnected from cluster1 and reconnected to cluster3 + retryStrategically((test) -> !topic3.getSubscriptions().isEmpty(), 10, 500); + assertFalse(topic3.getSubscriptions().isEmpty()); + + // publish messages to cluster3 and consume them + for (int i = 0; i < n; i++) { + producer1.sendAsync("test2".getBytes()); + producer3.sendAsync("test2".getBytes()); + } + for (int i = 0; i < n * 2; i++) { + Message msg = consumer1.receive(); + assertEquals(msg.getData(), "test2".getBytes()); + consumer1.acknowledge(msg); + } + + client1.close(); + client2.close(); + client3.close(); + } + static class TestBroker extends MockedPulsarServiceBaseTest { private String clusterName; diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index 28ad852064b4f..526f494732732 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -34,6 +34,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; import org.apache.pulsar.common.policies.data.BundlesData; +import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.EntryFilters; @@ -4676,6 +4677,28 @@ void setIsAllowAutoUpdateSchema(String namespace, boolean isAllowAutoUpdateSchem */ void updateMigrationState(String namespace, boolean migrated) throws PulsarAdminException; + /** + * Update migration state for a namespace. + * @param namespace + * Namespace name + * @param migrated + * Flag to determine namespace is migrated or not + * @param clusterUrl + * Cluster url data + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Namespace does not exist + * @throws PulsarAdminException + * Unexpected error + */ + void updateMigrationState(String namespace, boolean migrated, ClusterUrl clusterUrl) throws PulsarAdminException; + + /** + * Update migration state for a namespace asynchronously. + */ + CompletableFuture updateMigrationStateAsync(String namespace, boolean migrated, ClusterUrl clusterUrl); + /** * Set DispatcherPauseOnAckStatePersistent for a namespace asynchronously. */ diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 7695abdd4809b..20b79890b176f 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -43,6 +43,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; import org.apache.pulsar.common.policies.data.BundlesData; +import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.EntryFilters; @@ -1933,6 +1934,20 @@ public CompletableFuture updateMigrationStateAsync(String namespace, boole return asyncPostRequest(path, Entity.entity(migrated, MediaType.APPLICATION_JSON)); } + @Override + public void updateMigrationState(String namespace, boolean migrated, ClusterUrl clusterUrl) + throws PulsarAdminException { + sync(() -> updateMigrationStateAsync(namespace, migrated, clusterUrl)); + } + + @Override + public CompletableFuture updateMigrationStateAsync(String namespace, boolean migrated, + ClusterUrl clusterUrl) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "migrationState").queryParam("migrated", migrated); + return asyncPostRequest(path, Entity.entity(clusterUrl, MediaType.APPLICATION_JSON)); + } + private WebTarget namespacePath(NamespaceName namespace, String... parts) { final WebTarget base = namespace.isV2() ? adminV2Namespaces : adminNamespaces; WebTarget namespacePath = base.path(namespace.toString()); diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 5f8c9f49d65d1..0184c0a061a16 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -100,6 +100,7 @@ import org.apache.pulsar.common.policies.data.BookiesRackConfiguration; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.FailureDomain; @@ -913,6 +914,11 @@ public void namespaces() throws Exception { namespaces.run(split("remove-dispatcher-pause-on-ack-state-persistent myprop/clust/ns1")); verify(mockNamespaces).removeDispatcherPauseOnAckStatePersistent("myprop/clust/ns1"); + namespaces.run(split( + "update-migration-state --migrated --service-url serviceUrl --service-url-secure serviceUrlTls " + + "--broker-url brokerServiceUrl --broker-url-secure brokerServiceUrlTls myprop/clust/ns1")); + verify(mockNamespaces).updateMigrationState("myprop/clust/ns1", true, + new ClusterUrl("serviceUrl", "serviceUrlTls", "brokerServiceUrl", "brokerServiceUrlTls")); } @Test diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index 8adedcd14ac40..a8ab3362ffc6e 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -47,6 +47,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; import org.apache.pulsar.common.policies.data.BundlesData; +import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.EntryFilters; @@ -2553,10 +2554,24 @@ private class UpdateMigrationState extends CliCommand { @Option(names = "--migrated", description = "Is namespace migrated") private boolean migrated; + @Option(names = "--service-url", description = "New migrated cluster service url") + private String serviceUrl; + + @Option(names = "--service-url-secure", + description = "New migrated cluster service url secure") + private String serviceUrlTls; + + @Option(names = "--broker-url", description = "New migrated cluster broker service url") + private String brokerServiceUrl; + + @Option(names = "--broker-url-secure", description = "New migrated cluster broker service url secure") + private String brokerServiceUrlTls; + @Override void run() throws PulsarAdminException { String namespace = validateNamespace(namespaceName); - getAdmin().namespaces().updateMigrationState(namespace, migrated); + ClusterUrl clusterUrl = new ClusterUrl(serviceUrl, serviceUrlTls, brokerServiceUrl, brokerServiceUrlTls); + getAdmin().namespaces().updateMigrationState(namespace, migrated, clusterUrl); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java index 43f5130eb9fe8..a9336e8453d89 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java @@ -21,6 +21,7 @@ import static org.apache.pulsar.common.policies.data.PoliciesUtil.defaultBundle; import lombok.EqualsAndHashCode; import lombok.ToString; +import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl; /** * Local policies. @@ -35,28 +36,32 @@ public class LocalPolicies { // namespace anti-affinity-group public final String namespaceAntiAffinityGroup; public final boolean migrated; + public final ClusterUrl migratedClusterUrl; public LocalPolicies() { bundles = defaultBundle(); bookieAffinityGroup = null; namespaceAntiAffinityGroup = null; migrated = false; + migratedClusterUrl = null; } public LocalPolicies(BundlesData data, BookieAffinityGroupData bookieAffinityGroup, String namespaceAntiAffinityGroup) { - this(data, bookieAffinityGroup, namespaceAntiAffinityGroup, false); + this(data, bookieAffinityGroup, namespaceAntiAffinityGroup, false, null); } public LocalPolicies(BundlesData data, BookieAffinityGroupData bookieAffinityGroup, String namespaceAntiAffinityGroup, - boolean migrated) { + boolean migrated, + ClusterUrl migratedClusterUrl) { bundles = data; this.bookieAffinityGroup = bookieAffinityGroup; this.namespaceAntiAffinityGroup = namespaceAntiAffinityGroup; this.migrated = migrated; + this.migratedClusterUrl = migratedClusterUrl; } } \ No newline at end of file