Skip to content

Commit

Permalink
[fix][broker] Fix possible mark delete NPE when batch index ack is en…
Browse files Browse the repository at this point in the history
…abled (#23833)
  • Loading branch information
BewareMyPower authored Jan 14, 2025
1 parent f1f65a5 commit c92930f
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.bookkeeper.mledger.impl;

import javax.annotation.Nullable;

/**
* Interface to manage the ackSet state attached to a position.
* Helpers in {@link AckSetStateUtil} to create positions with
Expand All @@ -28,7 +30,7 @@ public interface AckSetState {
* Get the ackSet bitset information encoded as a long array.
* @return the ackSet
*/
long[] getAckSet();
@Nullable long[] getAckSet();

/**
* Set the ackSet bitset information as a long array.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@
import java.time.Clock;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -61,6 +62,8 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import javax.annotation.Nullable;
import lombok.Getter;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.BKException;
Expand Down Expand Up @@ -105,7 +108,6 @@
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.apache.pulsar.common.util.collections.LongPairRangeSet.LongPairConsumer;
import org.apache.pulsar.common.util.collections.LongPairRangeSet.RangeBoundConsumer;
Expand Down Expand Up @@ -200,7 +202,9 @@ public class ManagedCursorImpl implements ManagedCursor {

// Maintain the deletion status for batch messages
// (ledgerId, entryId) -> deletion indexes
protected final ConcurrentSkipListMap<Position, BitSetRecyclable> batchDeletedIndexes;
@Getter
@VisibleForTesting
@Nullable protected final ConcurrentSkipListMap<Position, BitSet> batchDeletedIndexes;
private final ReadWriteLock lock = new ReentrantReadWriteLock();

private RateLimiter markDeleteLimiter;
Expand Down Expand Up @@ -709,6 +713,7 @@ private void recoverIndividualDeletedMessages(List<MLDataFormats.MessageRange> i

private void recoverBatchDeletedIndexes (
List<MLDataFormats.BatchedEntryDeletionIndexInfo> batchDeletedIndexInfoList) {
Objects.requireNonNull(batchDeletedIndexes);
lock.writeLock().lock();
try {
this.batchDeletedIndexes.clear();
Expand All @@ -720,8 +725,7 @@ private void recoverBatchDeletedIndexes (
}
this.batchDeletedIndexes.put(
PositionFactory.create(batchDeletedIndexInfo.getPosition().getLedgerId(),
batchDeletedIndexInfo.getPosition().getEntryId()),
BitSetRecyclable.create().resetWords(array));
batchDeletedIndexInfo.getPosition().getEntryId()), BitSet.valueOf(array));
}
});
} finally {
Expand Down Expand Up @@ -1409,14 +1413,12 @@ public void operationComplete() {
lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor()
? getProperties() : Collections.emptyMap(), null, null);
individualDeletedMessages.clear();
if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
batchDeletedIndexes.values().forEach(BitSetRecyclable::recycle);
if (batchDeletedIndexes != null) {
batchDeletedIndexes.clear();
AckSetStateUtil.maybeGetAckSetState(newReadPosition).ifPresent(ackSetState -> {
long[] resetWords = ackSetState.getAckSet();
if (resetWords != null) {
BitSetRecyclable ackSet = BitSetRecyclable.create().resetWords(resetWords);
batchDeletedIndexes.put(newReadPosition, ackSet);
batchDeletedIndexes.put(newReadPosition, BitSet.valueOf(resetWords));
}
});
}
Expand Down Expand Up @@ -2045,47 +2047,7 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
log.debug("[{}] Mark delete cursor {} up to position: {}", ledger.getName(), name, position);
}

Position newPosition = position;

Optional<AckSetState> ackSetStateOptional = AckSetStateUtil.maybeGetAckSetState(newPosition);
if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
if (ackSetStateOptional.isPresent()) {
AtomicReference<BitSetRecyclable> bitSetRecyclable = new AtomicReference<>();
BitSetRecyclable givenBitSet =
BitSetRecyclable.create().resetWords(ackSetStateOptional.map(AckSetState::getAckSet).get());
// In order to prevent the batch index recorded in batchDeletedIndexes from rolling back,
// only update batchDeletedIndexes when the submitted batch index is greater
// than the recorded index.
batchDeletedIndexes.compute(newPosition,
(k, v) -> {
if (v == null) {
return givenBitSet;
}
if (givenBitSet.nextSetBit(0) > v.nextSetBit(0)) {
bitSetRecyclable.set(v);
return givenBitSet;
} else {
bitSetRecyclable.set(givenBitSet);
return v;
}
});
if (bitSetRecyclable.get() != null) {
bitSetRecyclable.get().recycle();
}
newPosition = ledger.getPreviousPosition(newPosition);
}
Map<Position, BitSetRecyclable> subMap = batchDeletedIndexes.subMap(PositionFactory.EARLIEST, newPosition);
subMap.values().forEach(BitSetRecyclable::recycle);
subMap.clear();
} else {
if (ackSetStateOptional.isPresent()) {
AckSetState ackSetState = ackSetStateOptional.get();
if (ackSetState.getAckSet() != null) {
newPosition = ledger.getPreviousPosition(newPosition);
}
}
}

Position newPosition = ackBatchPosition(position);
if (ledger.getLastConfirmedEntry().compareTo(newPosition) < 0) {
boolean shouldCursorMoveForward = false;
try {
Expand Down Expand Up @@ -2131,6 +2093,31 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
internalAsyncMarkDelete(newPosition, properties, callback, ctx);
}

private Position ackBatchPosition(Position position) {
return AckSetStateUtil.maybeGetAckSetState(position)
.map(AckSetState::getAckSet)
.map(ackSet -> {
if (batchDeletedIndexes == null) {
return ledger.getPreviousPosition(position);
}
// In order to prevent the batch index recorded in batchDeletedIndexes from rolling back,
// only update batchDeletedIndexes when the submitted batch index is greater
// than the recorded index.
final var givenBitSet = BitSet.valueOf(ackSet);
batchDeletedIndexes.compute(position, (k, v) -> {
if (v == null || givenBitSet.nextSetBit(0) > v.nextSetBit(0)) {
return givenBitSet;
} else {
return v;
}
});
final var newPosition = ledger.getPreviousPosition(position);
batchDeletedIndexes.subMap(PositionFactory.EARLIEST, newPosition).clear();
return newPosition;
})
.orElse(position);
}

protected void internalAsyncMarkDelete(final Position newPosition, Map<String, Long> properties,
final MarkDeleteCallback callback, final Object ctx) {
ledger.mbean.addMarkDeleteOp();
Expand Down Expand Up @@ -2236,12 +2223,10 @@ public void operationComplete() {
try {
individualDeletedMessages.removeAtMost(mdEntry.newPosition.getLedgerId(),
mdEntry.newPosition.getEntryId());
if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
Map<Position, BitSetRecyclable> subMap = batchDeletedIndexes.subMap(PositionFactory.EARLIEST,
if (batchDeletedIndexes != null) {
batchDeletedIndexes.subMap(PositionFactory.EARLIEST,
false, PositionFactory.create(mdEntry.newPosition.getLedgerId(),
mdEntry.newPosition.getEntryId()), true);
subMap.values().forEach(BitSetRecyclable::recycle);
subMap.clear();
mdEntry.newPosition.getEntryId()), true).clear();
}
persistentMarkDeletePosition = mdEntry.newPosition;
} finally {
Expand Down Expand Up @@ -2376,11 +2361,8 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
}

if (internalIsMessageDeleted(position)) {
if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);
if (bitSetRecyclable != null) {
bitSetRecyclable.recycle();
}
if (batchDeletedIndexes != null) {
batchDeletedIndexes.remove(position);
}
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Position was already deleted {}", ledger.getName(), name, position);
Expand All @@ -2389,11 +2371,8 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
}
long[] ackSet = AckSetStateUtil.getAckSetArrayOrNull(position);
if (ackSet == null) {
if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);
if (bitSetRecyclable != null) {
bitSetRecyclable.recycle();
}
if (batchDeletedIndexes != null) {
batchDeletedIndexes.remove(position);
}
// Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will
// make the RangeSet recognize the "continuity" between adjacent Positions.
Expand All @@ -2406,23 +2385,19 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
log.debug("[{}] [{}] Individually deleted messages: {}", ledger.getName(), name,
individualDeletedMessages);
}
} else if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(ackSet);
BitSetRecyclable bitSet = batchDeletedIndexes.computeIfAbsent(position, (v) -> givenBitSet);
} else if (batchDeletedIndexes != null) {
final var givenBitSet = BitSet.valueOf(ackSet);
final var bitSet = batchDeletedIndexes.computeIfAbsent(position, __ -> givenBitSet);
if (givenBitSet != bitSet) {
bitSet.and(givenBitSet);
givenBitSet.recycle();
}
if (bitSet.isEmpty()) {
Position previousPosition = ledger.getPreviousPosition(position);
individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(),
previousPosition.getEntryId(),
position.getLedgerId(), position.getEntryId());
MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this);
BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);
if (bitSetRecyclable != null) {
bitSetRecyclable.recycle();
}
batchDeletedIndexes.remove(position);
}
}
}
Expand Down Expand Up @@ -3213,17 +3188,17 @@ private List<MLDataFormats.MessageRange> buildIndividualDeletedMessageRanges() {
private List<MLDataFormats.BatchedEntryDeletionIndexInfo> buildBatchEntryDeletionIndexInfoList() {
lock.readLock().lock();
try {
if (!getConfig().isDeletionAtBatchIndexLevelEnabled() || batchDeletedIndexes.isEmpty()) {
if (batchDeletedIndexes == null || batchDeletedIndexes.isEmpty()) {
return Collections.emptyList();
}
MLDataFormats.NestedPositionInfo.Builder nestedPositionBuilder = MLDataFormats.NestedPositionInfo
.newBuilder();
MLDataFormats.BatchedEntryDeletionIndexInfo.Builder batchDeletedIndexInfoBuilder = MLDataFormats
.BatchedEntryDeletionIndexInfo.newBuilder();
List<MLDataFormats.BatchedEntryDeletionIndexInfo> result = new ArrayList<>();
Iterator<Map.Entry<Position, BitSetRecyclable>> iterator = batchDeletedIndexes.entrySet().iterator();
final var iterator = batchDeletedIndexes.entrySet().iterator();
while (iterator.hasNext() && result.size() < getConfig().getMaxBatchDeletedIndexToPersist()) {
Map.Entry<Position, BitSetRecyclable> entry = iterator.next();
final var entry = iterator.next();
nestedPositionBuilder.setLedgerId(entry.getKey().getLedgerId());
nestedPositionBuilder.setEntryId(entry.getKey().getEntryId());
batchDeletedIndexInfoBuilder.setPosition(nestedPositionBuilder.build());
Expand Down Expand Up @@ -3643,11 +3618,11 @@ private boolean internalIsMessageDeleted(Position position) {
@Override
public long[] getBatchPositionAckSet(Position position) {
if (batchDeletedIndexes != null) {
BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.get(position);
if (bitSetRecyclable == null) {
final var bitSet = batchDeletedIndexes.get(position);
if (bitSet == null) {
return null;
} else {
return bitSetRecyclable.toLongArray();
return bitSet.toLongArray();
}
} else {
return null;
Expand Down Expand Up @@ -3750,8 +3725,8 @@ private ManagedCursorImpl cursorImpl() {

@Override
public long[] getDeletedBatchIndexesAsLongArray(Position position) {
if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
BitSetRecyclable bitSet = batchDeletedIndexes.get(position);
if (batchDeletedIndexes != null) {
final var bitSet = batchDeletedIndexes.get(position);
return bitSet == null ? null : bitSet.toLongArray();
} else {
return null;
Expand Down Expand Up @@ -3879,9 +3854,9 @@ public ManagedCursor duplicateNonDurableCursor(String nonDurableCursorName) thro
lock.readLock().unlock();
}
if (batchDeletedIndexes != null) {
for (Map.Entry<Position, BitSetRecyclable> entry : this.batchDeletedIndexes.entrySet()) {
BitSetRecyclable copiedBitSet = BitSetRecyclable.valueOf(entry.getValue());
newNonDurableCursor.batchDeletedIndexes.put(entry.getKey(), copiedBitSet);
Objects.requireNonNull(newNonDurableCursor.batchDeletedIndexes);
for (final var entry : this.batchDeletedIndexes.entrySet()) {
newNonDurableCursor.batchDeletedIndexes.put(entry.getKey(), (BitSet) entry.getValue().clone());
}
}
return newNonDurableCursor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -45,7 +44,6 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -223,10 +221,7 @@ public void txnAckTestBatchAndSharedSubMemoryDeleteTest() throws Exception {
(LinkedMap<TxnID, HashMap<Position, Position>>) field.get(pendingAckHandle);
assertTrue(individualAckOfTransaction.isEmpty());
managedCursor = (ManagedCursorImpl) testPersistentSubscription.getCursor();
field = ManagedCursorImpl.class.getDeclaredField("batchDeletedIndexes");
field.setAccessible(true);
final ConcurrentSkipListMap<Position, BitSetRecyclable> batchDeletedIndexes =
(ConcurrentSkipListMap<Position, BitSetRecyclable>) field.get(managedCursor);
final var batchDeletedIndexes = managedCursor.getBatchDeletedIndexes();
if (retryCnt == 0) {
//one message are not ack
Awaitility.await().until(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class MLPendingAckStoreTest extends TransactionTestBase {
@BeforeClass
@Override
protected void setup() throws Exception {
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
setUpBase(1, 1, NAMESPACE1 + "/test", 0);
}

Expand Down Expand Up @@ -304,4 +305,4 @@ private LinkedHashSet<Long> calculatePendingAckIndexes(List<Long> positionList,
}
return indexes;
}
}
}

0 comments on commit c92930f

Please sign in to comment.