Skip to content

Commit

Permalink
[improve][broker] Unblock stuck Key_Shared subscription after consume…
Browse files Browse the repository at this point in the history
…r reconnect
  • Loading branch information
Nikolai Borisov committed Nov 13, 2023
1 parent 7c6a4b8 commit 4c8e122
Show file tree
Hide file tree
Showing 8 changed files with 312 additions and 26 deletions.
8 changes: 8 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,14 @@ maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16
# Broker periodically checks if subscription is stuck and unblock if flag is enabled. (Default is disabled)
unblockStuckSubscriptionEnabled=false

# If enabled subscriptions stores keys of messages which allows consumer not
# to stuck in case it goes to recently assigned. The setting allows to overcome
# situation when new KeyShared consumers will not get any messages until a consumer
# that did get messages disconnects or acks/nacks some messages
# The trade of is the need to track all the not acked messages in subscription
# which could potentially lead to performance degradation and higher memory consumption
rememberNotAckedMessagesKey=false

# Tick time to schedule task that checks topic publish rate limiting across all topics
# Reducing to lower value can give more accuracy while throttling publish but
# it uses more CPU to perform frequent check. (Disable publish throttling with value 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,16 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
+ " unacked messages than this percentage limit and subscription will not receive any new messages "
+ " until that subscription acks back `limit/2` messages")
private double maxUnackedMessagesPerSubscriptionOnBrokerBlocked = 0.16;
@FieldContext(
category = CATEGORY_POLICIES,
doc = "If enabled subscriptions stores keys of messages which allows consumer not "
+ "to stuck in case it goes to recently assigned. The setting allows to overcome "
+ "situation when new KeyShared consumers will not get any messages until a consumer "
+ "that did get messages disconnects or acks/nacks some messages "
+ "The trade of is the need to track all the not acked messages in subscription"
+ "which could potentially lead to performance degradation and higher memory consumption"
)
private boolean rememberNotAckedMessagesKey = false;
@FieldContext(
category = CATEGORY_POLICIES,
doc = "Maximum size of Consumer metadata")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ public Future<Void> sendMessages(final List<? extends Entry> entries, EntryBatch
unackedMessages -= (batchSize - BitSet.valueOf(ackSet).cardinality());
}
pendingAcks.put(entry.getLedgerId(), entry.getEntryId(), batchSize, stickyKeyHash);
subscription.addPendingMessageKey(entry, subscription.getName(), consumerId);
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Added {}:{} ledger entry with batchSize of {} to pendingAcks in"
+ " broker.service.Consumer for consumerId: {}",
Expand Down Expand Up @@ -984,6 +985,7 @@ private boolean removePendingAcks(PositionImpl position) {
? ackOwnedConsumer.getPendingAcks().get(position.getLedgerId(), position.getEntryId())
: null;
if (ackedPosition != null) {
subscription.removePendingMessageKey(position.getEntryId());
if (!ackOwnedConsumer.getPendingAcks().remove(position.getLedgerId(), position.getEntryId())) {
// Message was already removed by the other consumer
return false;
Expand Down Expand Up @@ -1102,6 +1104,7 @@ private int addAndGetUnAckedMsgs(Consumer consumer, int ackedMessages) {
private void clearUnAckedMsgs() {
int unaAckedMsgs = UNACKED_MESSAGES_UPDATER.getAndSet(this, 0);
subscription.addUnAckedMessages(-unaAckedMsgs);
subscription.cleanPendingMessageKeys();
}

public boolean isPreciseDispatcherFlowControl() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,26 @@ default long getNumberOfEntriesDelayed() {

boolean isSubscriptionMigrated();

default boolean isPendingAckMessageKeysRemembered() {
return false;
}

default void addPendingMessageKey(Entry pendingEntry, String subscription, long consumerId) {
//Default is no op
}

default void removePendingMessageKey(long pendingEntryId) {
//Default is no op
}

default void cleanPendingMessageKeys() {
//Default is no op
}

default boolean couldSendToConsumer(String messageKey, long consumerId) {
return true;
}

default void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) {
// Default is no-op
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ protected boolean isConsumersExceededOnSubscription() {
public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
// decrement unack-message count for removed consumer
addUnAckedMessages(-consumer.getUnackedMessages());
if (subscription.isPendingAckMessageKeysRemembered()) {
consumer.getPendingAcks().keys().forEach(value -> subscription.removePendingMessageKey(value.second));
}
if (consumerSet.removeAll(consumer) == 1) {
consumerList.remove(consumer);
log.info("Removed consumer {} with pending {} acks", consumer, consumer.getPendingAcks().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
Expand All @@ -52,6 +53,8 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -256,41 +259,47 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
availablePermits = Math.min(availablePermits, remainUnAckedMessages);
}
int maxMessagesForC = Math.min(entriesWithSameKeyCount, availablePermits);
int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey, maxMessagesForC,
readType, consumerStickyKeyHashesMap.get(consumer));
Pair<Integer, List<Entry>> messagesWithEntries = getRestrictedMaxEntriesForConsumer(
consumer,
entriesWithSameKey,
maxMessagesForC,
readType, consumerStickyKeyHashesMap.get(consumer)
);
int messagesForC = messagesWithEntries.getKey();
List<Entry> toDispatch = messagesWithEntries.getValue();
if (log.isDebugEnabled()) {
log.debug("[{}] select consumer {} with messages num {}, read type is {}",
name, consumer.consumerName(), messagesForC, readType);
}

if (messagesForC < entriesWithSameKeyCount) {
if (messagesForC < toDispatch.size()) {
// We are not able to push all the messages with given key to its consumer,
// so we discard for now and mark them for later redelivery
for (int i = messagesForC; i < entriesWithSameKeyCount; i++) {
Entry entry = entriesWithSameKey.get(i);
for (int i = messagesForC; i < toDispatch.size(); i++) {
Entry entry = toDispatch.get(i);
long stickyKeyHash = getStickyKeyHash(entry);
addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
entry.release();
entriesWithSameKey.set(i, null);
toDispatch.set(i, null);
}
}

if (messagesForC > 0) {
// remove positions first from replay list first : sendMessages recycles entries
if (readType == ReadType.Replay) {
for (int i = 0; i < messagesForC; i++) {
Entry entry = entriesWithSameKey.get(i);
Entry entry = toDispatch.get(i);
redeliveryMessages.remove(entry.getLedgerId(), entry.getEntryId());
}
}

SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(messagesForC);
EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(messagesForC);
totalEntries += filterEntriesForConsumer(entriesWithSameKey, batchSizes, sendMessageInfo,
totalEntries += filterEntriesForConsumer(toDispatch, batchSizes, sendMessageInfo,
batchIndexesAcks, cursor, readType == ReadType.Replay, consumer);

consumer.sendMessages(entriesWithSameKey, batchSizes, batchIndexesAcks,
consumer.sendMessages(toDispatch, batchSizes, batchIndexesAcks,
sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
getRedeliveryTracker()).addListener(future -> {
Expand Down Expand Up @@ -332,27 +341,37 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
return false;
}

private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> entries, int maxMessages,
ReadType readType, Set<Integer> stickyKeyHashes) {
private Pair<Integer, List<Entry>> getRestrictedMaxEntriesForConsumer(
Consumer consumer,
List<Entry> entries,
int maxMessages,
ReadType readType,
Set<Integer> stickyKeyHashes
) {
if (maxMessages == 0) {
return 0;
return Pair.of(0, entries);
}
if (readType == ReadType.Normal && stickyKeyHashes != null
&& redeliveryMessages.containsStickyKeyHashes(stickyKeyHashes)) {
// If redeliveryMessages contains messages that correspond to the same hash as the messages
// that the dispatcher is trying to send, do not send those messages for order guarantee
return 0;
return Pair.of(0, entries);
}
if (recentlyJoinedConsumers == null) {
return maxMessages;
return Pair.of(maxMessages, entries);
}
removeConsumersFromRecentJoinedConsumers();
PositionImpl maxReadPosition = recentlyJoinedConsumers.get(consumer);
// At this point, all the old messages were already consumed and this consumer
// is now ready to receive any message
if (maxReadPosition == null) {
// The consumer has not recently joined, so we can send all messages
return maxMessages;
return Pair.of(maxMessages, entries);
}

if (subscription.isPendingAckMessageKeysRemembered()) {
//if pending ack messages tracked we do not need to block recently joined consumers by position
return getRestrictedEntriesForConsumerPendingAck(entries, consumer);
}

// If the read type is Replay, we should avoid send messages that hold by other consumer to the new consumers,
Expand Down Expand Up @@ -381,11 +400,38 @@ private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> en
if (((PositionImpl) entries.get(i).getPosition()).compareTo(maxReadPosition) >= 0) {
// We have already crossed the divider line. All messages in the list are now
// newer than what we can currently dispatch to this consumer
return i;
return Pair.of(i, entries);
}
}

return Pair.of(maxMessages, entries);
}

private Pair<Integer, List<Entry>> getRestrictedEntriesForConsumerPendingAck(
List<Entry> entries,
Consumer consumer
) {
List<Entry> filtered = new ArrayList<>(entries.size());
//if we have recently joined consumers we should skip sending messages with not acked keys
for (Entry entry : entries) {
MessageMetadata metadata = Commands.peekAndCopyMessageMetadata(
entry.getDataBuffer(),
subscription.getName(),
consumer.consumerId()
);


if (metadata == null || !metadata.hasPartitionKey()
|| subscription.couldSendToConsumer(metadata.getPartitionKey(), consumer.consumerId())) {
filtered.add(entry);
} else {
long stickyKeyHash = getStickyKeyHash(entry);
addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
entry.release();
}
}

return maxMessages;
return Pair.of(filtered.size(), filtered);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -55,6 +56,7 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.AbstractSubscription;
Expand Down Expand Up @@ -124,8 +126,11 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
Map.of(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
private static final Map<String, Long> NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = Map.of();

private final Map<Long, Pair<Long, String>> pendingMessages = new ConcurrentHashMap<>();

private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache;
private final PendingAckHandle pendingAckHandle;
private final boolean shouldRememberUnackedMessageKey;
private volatile Map<String, String> subscriptionProperties;
private volatile CompletableFuture<Void> fenceFuture;

Expand Down Expand Up @@ -159,6 +164,11 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma
} else {
this.pendingAckHandle = new PendingAckHandleDisabled();
}
shouldRememberUnackedMessageKey = topic
.getBrokerService()
.getPulsar()
.getConfig()
.isRememberNotAckedMessagesKey();
IS_FENCED_UPDATER.set(this, FALSE);
}

Expand Down Expand Up @@ -1264,6 +1274,52 @@ public boolean isSubscriptionMigrated() {
return topic.isMigrated() && cursor.getNumberOfEntriesInBacklog(true) <= 0;
}

@Override
public boolean isPendingAckMessageKeysRemembered() {
return shouldRememberUnackedMessageKey;
}

@Override
public void addPendingMessageKey(Entry pendingMessage, String subscription, long consumerId) {
if (shouldRememberUnackedMessageKey && pendingMessage != null) {
MessageMetadata metadata = Commands.peekAndCopyMessageMetadata(
pendingMessage.getDataBuffer(),
subscription,
consumerId
);
if (metadata != null && metadata.hasPartitionKey()) {
pendingMessages.put(pendingMessage.getEntryId(), Pair.of(consumerId, metadata.getPartitionKey()));
}
}
}

@Override
public void removePendingMessageKey(long pendingEntryId) {
if (shouldRememberUnackedMessageKey) {
pendingMessages.remove(pendingEntryId);
}
}

@Override
public void cleanPendingMessageKeys() {
if (shouldRememberUnackedMessageKey) {
pendingMessages.clear();
}
}

@Override
public boolean couldSendToConsumer(String messageKey, long consumerId) {
if (!shouldRememberUnackedMessageKey) {
return true;
}
for (Pair<Long, String> pendingMessageKey: pendingMessages.values()) {
if (messageKey.equals(pendingMessageKey.getValue()) && !pendingMessageKey.getKey().equals(consumerId)) {
return false;
}
}
return true;
}

@Override
public Map<String, String> getSubscriptionProperties() {
return subscriptionProperties;
Expand Down
Loading

0 comments on commit 4c8e122

Please sign in to comment.