Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Support namespace-level configuration of migratedClusterUrl in blue-green migration feature #23788

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.EntryFilters;
Expand Down Expand Up @@ -961,7 +962,8 @@ protected void internalSetBookieAffinityGroup(BookieAffinityGroupData bookieAffi
policies -> new LocalPolicies(policies.bundles,
bookieAffinityGroup,
policies.namespaceAntiAffinityGroup,
policies.migrated))
policies.migrated,
policies.migratedClusterUrl))
.orElseGet(() -> new LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()),
bookieAffinityGroup,
null));
Expand Down Expand Up @@ -1781,7 +1783,8 @@ protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) {
lp.map(policies -> new LocalPolicies(policies.bundles,
policies.bookieAffinityGroup,
antiAffinityGroup,
policies.migrated))
policies.migrated,
policies.migratedClusterUrl))
.orElseGet(() -> new LocalPolicies(defaultBundle(),
null, antiAffinityGroup))
);
Expand Down Expand Up @@ -1819,7 +1822,8 @@ protected void internalRemoveNamespaceAntiAffinityGroup() {
new LocalPolicies(policies.bundles,
policies.bookieAffinityGroup,
null,
policies.migrated));
policies.migrated,
policies.migratedClusterUrl));
log.info("[{}] Successfully removed anti-affinity group for a namespace={}", clientAppId(), namespaceName);
} catch (Exception e) {
log.error("[{}] Failed to remove anti-affinity group for namespace {}", clientAppId(), namespaceName, e);
Expand Down Expand Up @@ -2772,16 +2776,37 @@ protected void internalEnableMigration(boolean migrated) {
policies -> new LocalPolicies(policies.bundles,
policies.bookieAffinityGroup,
policies.namespaceAntiAffinityGroup,
migrated))
migrated,
policies.migratedClusterUrl))
.orElseGet(() -> new LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()),
null, null, migrated)));
null, null, migrated, null)));
log.info("Successfully updated migration on namespace {}", namespaceName);
} catch (Exception e) {
log.error("Failed to update migration on namespace {}", namespaceName, e);
throw new RestException(e);
}
}

protected void internalUpdateMigrationState(boolean migrated, ClusterUrl clusterUrl) {
validateSuperUserAccess();
try {
getLocalPolicies().setLocalPoliciesWithCreate(namespaceName, oldPolicies -> oldPolicies.map(
policies -> new LocalPolicies(policies.bundles,
policies.bookieAffinityGroup,
policies.namespaceAntiAffinityGroup,
migrated,
clusterUrl))
.orElseGet(() -> new LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()),
null, null, migrated, clusterUrl)));
log.info("Successfully updated migration state on namespace {}, migrated={}, clusterUrl={}",
namespaceName, migrated, clusterUrl);
} catch (Exception e) {
log.error("Failed to update migration state on namespace {}, migrated={}, clusterUrl={}",
namespaceName, migrated, clusterUrl, e);
throw new RestException(e);
}
}

protected Policies getDefaultPolicesIfNull(Policies policies) {
if (policies == null) {
policies = new Policies();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
Expand Down Expand Up @@ -1722,6 +1723,22 @@ public void enableMigration(@PathParam("property") String property,
internalEnableMigration(migrated);
}

@POST
@Path("/{property}/{cluster}/{namespace}/migrationState")
@ApiOperation(hidden = true, value = "Update migration state for a namespace")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Operation successful"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
public void updateMigrationState(@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
@QueryParam("migrated") boolean migrated,
ClusterUrl clusterUrl) {
validateNamespaceName(property, cluster, namespace);
internalUpdateMigrationState(migrated, clusterUrl);
}

@PUT
@Path("/{property}/{cluster}/{namespace}/policy")
@ApiOperation(value = "Creates a new namespace with the specified policies")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.EntryFilters;
Expand Down Expand Up @@ -3019,6 +3020,21 @@ public void enableMigration(@PathParam("tenant") String tenant,
internalEnableMigration(migrated);
}

@POST
@Path("/{tenant}/{namespace}/migrationState")
@ApiOperation(hidden = true, value = "Update migration state for a namespace")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Operation successful"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
public void updateMigrationState(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@QueryParam("migrated") boolean migrated,
ClusterUrl clusterUrl) {
validateNamespaceName(tenant, namespace);
internalUpdateMigrationState(migrated, clusterUrl);
}

@POST
@Path("/{tenant}/{namespace}/dispatcherPauseOnAckStatePersistent")
@ApiOperation(value = "Set dispatcher pause on ack state persistent configuration for specified namespace.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1363,33 +1363,35 @@ public static CompletableFuture<Optional<ClusterUrl>> getMigratedClusterUrlAsync
CompletableFuture<Optional<ClusterUrl>> result = new CompletableFuture<>();
pulsar.getPulsarResources().getClusterResources().getClusterPoliciesResources()
.getClusterPoliciesAsync(pulsar.getConfig().getClusterName())
.thenCombine(isNamespaceMigrationEnabledAsync(pulsar, topic),
((clusterData, isNamespaceMigrationEnabled) -> {
Optional<ClusterUrl> url = (clusterData.isPresent() && (clusterData.get().isMigrated()
|| isNamespaceMigrationEnabled))
? Optional.ofNullable(clusterData.get().getMigratedClusterUrl())
: Optional.empty();
return url;
.thenCombine(pulsar.getPulsarResources().getLocalPolicies()
.getLocalPoliciesAsync(TopicName.get(topic).getNamespaceObject()),
((clusterData, nsLocalPolicies) -> {
boolean isNamespaceMigrated = false;
if (nsLocalPolicies.isPresent() && nsLocalPolicies.get().migrated) {
isNamespaceMigrated = true;
ClusterUrl clusterUrl = nsLocalPolicies.get().migratedClusterUrl;
if (clusterUrl != null && !clusterUrl.isEmpty()) {
return Optional.of(clusterUrl);
}
}
if (clusterData.isPresent() && (clusterData.get().isMigrated() || isNamespaceMigrated)) {
return Optional.ofNullable(clusterData.get().getMigratedClusterUrl());
}
return Optional.empty();
}))
.thenAccept(res -> {
// cluster policies future is completed by metadata-store thread and continuing further
// processing in the same metadata store can cause deadlock while creating topic as
// create topic path may have blocking call on metadata-store. so, complete future on a
// separate thread to avoid deadlock.
pulsar.getExecutor().execute(() -> result.complete(res));
pulsar.getExecutor().execute(() -> result.complete((Optional<ClusterUrl>) res));
}).exceptionally(ex -> {
pulsar.getExecutor().execute(() -> result.completeExceptionally(ex.getCause()));
return null;
});
return result;
}

private static CompletableFuture<Boolean> isNamespaceMigrationEnabledAsync(PulsarService pulsar, String topic) {
return pulsar.getPulsarResources().getLocalPolicies()
.getLocalPoliciesAsync(TopicName.get(topic).getNamespaceObject())
.thenApply(policies -> policies.isPresent() && policies.get().migrated);
}

public static Optional<ClusterUrl> getMigratedClusterUrl(PulsarService pulsar, String topic) {
try {
return getMigratedClusterUrlAsync(pulsar, topic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ public LocalPolicies toLocalPolicies() {
return new LocalPolicies(this.getBundlesData(),
localPolicies.map(lp -> lp.getLeft().bookieAffinityGroup).orElse(null),
localPolicies.map(lp -> lp.getLeft().namespaceAntiAffinityGroup).orElse(null),
localPolicies.map(lp -> lp.getLeft().migrated).orElse(false));
localPolicies.map(lp -> lp.getLeft().migrated).orElse(false),
localPolicies.map(lp -> lp.getLeft().migratedClusterUrl).orElse(null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1192,6 +1192,111 @@ public void testNamespaceMigrationWithReplicationBacklog(SubscriptionType subTyp
client3.close();
}

@Test
public void testNamespaceMigrationWithNamespaceLevelMigratedClusterUrl() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/migrationTopic");

// cluster1 producer/consumer
@Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
Producer<byte[]> producer1 = client1.newProducer().topic(topicName).enableBatching(false)
.producerName("producer1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
Consumer<byte[]> consumer1 = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
.subscriptionName("s1").subscribe();
AbstractTopic topic1 = (AbstractTopic) pulsar1.getBrokerService().getTopic(topicName, false).getNow(null).get();
retryStrategically((test) -> !topic1.getProducers().isEmpty(), 5, 500);
retryStrategically((test) -> !topic1.getSubscriptions().isEmpty(), 5, 500);
assertFalse(topic1.getProducers().isEmpty());
assertFalse(topic1.getSubscriptions().isEmpty());

// build backlog
consumer1.close();
int n = 5;
for (int i = 0; i < n; i++) {
producer1.send("test1".getBytes());
}

// cluster2 producer/consumer
@Cleanup
PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
Producer<byte[]> producer2 = client2.newProducer().topic(topicName).enableBatching(false)
.producerName("producer2").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
AbstractTopic topic2 = (AbstractTopic) pulsar2.getBrokerService().getTopic(topicName, false).getNow(null).get();
assertFalse(topic2.getProducers().isEmpty());
assertTrue(topic2.getSubscriptions().isEmpty());

// cluster3 producer/consumer
@Cleanup
PulsarClient client3 = PulsarClient.builder().serviceUrl(url3.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
Producer<byte[]> producer3 = client3.newProducer().topic(topicName).enableBatching(false)
.producerName("producer3").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
AbstractTopic topic3 = (AbstractTopic) pulsar3.getBrokerService().getTopic(topicName, false).getNow(null).get();
assertFalse(topic3.getProducers().isEmpty());
assertTrue(topic3.getSubscriptions().isEmpty());

// Set the migratedCluster at both the cluster level and the namespace level. Since the configuration at
// the namespace level has a higher priority, the topic will be migrated to cluster3.
ClusterUrl cluster2MigratedUrl = new ClusterUrl(pulsar2.getWebServiceAddress(),
pulsar2.getWebServiceAddressTls(), pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls());
ClusterUrl cluster3MigratedUrl = new ClusterUrl(pulsar3.getWebServiceAddress(),
pulsar3.getWebServiceAddressTls(), pulsar3.getBrokerServiceUrl(), pulsar3.getBrokerServiceUrlTls());
admin1.clusters().updateClusterMigration("r1", true, cluster2MigratedUrl);
admin1.namespaces().updateMigrationState(namespace, true, cluster3MigratedUrl);

retryStrategically((test) -> {
try {
topic1.checkClusterMigration().get();
return true;
} catch (Exception e) {
// ok
}
return false;
}, 10, 500);
topic1.checkClusterMigration().get();

sleep(1000);
producer1.sendAsync("test1".getBytes());

// producer1 is disconnected from cluster1
retryStrategically((test) -> topic1.getProducers().isEmpty(), 10, 500);
assertTrue(topic1.getProducers().isEmpty());
// producer1 is not connected to cluster2
retryStrategically((test) -> topic2.getProducers().size() == 1, 10, 500);
assertEquals(topic2.getProducers().size(), 1);
// producer1 is connected to cluster3
retryStrategically((test) -> topic3.getProducers().size() == 2, 10, 500);
assertEquals(topic3.getProducers().size(), 2);

// try to consume backlog messages from cluster1
consumer1 = client1.newConsumer().topic(topicName).subscriptionName("s1").subscribe();
for (int i = 0; i < n; i++) {
Message<byte[]> msg = consumer1.receive();
assertEquals(msg.getData(), "test1".getBytes());
consumer1.acknowledge(msg);
}
// after consuming all messages, consumer1 should have disconnected from cluster1 and reconnected to cluster3
retryStrategically((test) -> !topic3.getSubscriptions().isEmpty(), 10, 500);
assertFalse(topic3.getSubscriptions().isEmpty());

// publish messages to cluster3 and consume them
for (int i = 0; i < n; i++) {
producer1.sendAsync("test2".getBytes());
producer3.sendAsync("test2".getBytes());
}
for (int i = 0; i < n * 2; i++) {
Message<byte[]> msg = consumer1.receive();
assertEquals(msg.getData(), "test2".getBytes());
consumer1.acknowledge(msg);
}

client1.close();
client2.close();
client3.close();
}

static class TestBroker extends MockedPulsarServiceBaseTest {

private String clusterName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.EntryFilters;
Expand Down Expand Up @@ -4676,6 +4677,28 @@ void setIsAllowAutoUpdateSchema(String namespace, boolean isAllowAutoUpdateSchem
*/
void updateMigrationState(String namespace, boolean migrated) throws PulsarAdminException;

/**
* Update migration state for a namespace.
* @param namespace
* Namespace name
* @param migrated
* Flag to determine namespace is migrated or not
* @param clusterUrl
* Cluster url data
* @throws NotAuthorizedException
* Don't have admin permission
* @throws NotFoundException
* Namespace does not exist
* @throws PulsarAdminException
* Unexpected error
*/
void updateMigrationState(String namespace, boolean migrated, ClusterUrl clusterUrl) throws PulsarAdminException;

/**
* Update migration state for a namespace asynchronously.
*/
CompletableFuture<Void> updateMigrationStateAsync(String namespace, boolean migrated, ClusterUrl clusterUrl);

/**
* Set DispatcherPauseOnAckStatePersistent for a namespace asynchronously.
*/
Expand Down
Loading
Loading