Skip to content

Commit

Permalink
[fix] [ml] Fix cursor metadata compatability issue when switching the…
Browse files Browse the repository at this point in the history
… config unackedRangesOpenCacheSetEnabled (#23759)
  • Loading branch information
poorbarcode authored Jan 20, 2025
1 parent 8015795 commit 4ee4633
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ public class ManagedLedgerConfig {
@Getter
@Setter
private String shadowSourceName;
@Getter
private boolean persistIndividualAckAsLongArray;

public boolean isCreateIfMissing() {
return createIfMissing;
Expand All @@ -103,6 +105,11 @@ public ManagedLedgerConfig setCreateIfMissing(boolean createIfMissing) {
return this;
}

public ManagedLedgerConfig setPersistIndividualAckAsLongArray(boolean persistIndividualAckAsLongArray) {
this.persistIndividualAckAsLongArray = persistIndividualAckAsLongArray;
return this;
}

/**
* @return the lazyCursorRecovery
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ public class ManagedCursorImpl implements ManagedCursor {
protected volatile long messagesConsumedCounter;

// Current ledger used to append the mark-delete position
private volatile LedgerHandle cursorLedger;
@VisibleForTesting
volatile LedgerHandle cursorLedger;

// Wether the current cursorLedger is read-only or writable
private boolean isCursorLedgerReadOnly = true;
Expand Down Expand Up @@ -643,7 +644,22 @@ public void recoverIndividualDeletedMessages(PositionInfo positionInfo) {
try {
Map<Long, long[]> rangeMap = rangeList.stream().collect(Collectors.toMap(LongListMap::getKey,
list -> list.getValuesList().stream().mapToLong(i -> i).toArray()));
individualDeletedMessages.build(rangeMap);
// Guarantee compatability for the config "unackedRangesOpenCacheSetEnabled".
if (getConfig().isUnackedRangesOpenCacheSetEnabled()) {
individualDeletedMessages.build(rangeMap);
} else {
RangeSetWrapper<Position> rangeSetWrapperV2 = new RangeSetWrapper<>(positionRangeConverter,
positionRangeReverseConverter, true,
getConfig().isPersistentUnackedRangesWithMultipleEntriesEnabled());
rangeSetWrapperV2.build(rangeMap);
rangeSetWrapperV2.forEach(range -> {
individualDeletedMessages.addOpenClosed(range.lowerEndpoint().getLedgerId(),
range.lowerEndpoint().getEntryId(), range.upperEndpoint().getLedgerId(),
range.upperEndpoint().getEntryId());
return true;
});
rangeSetWrapperV2.clear();
}
} catch (Exception e) {
log.warn("[{}]-{} Failed to recover individualDeletedMessages from serialized data", ledger.getName(),
name, e);
Expand Down Expand Up @@ -2376,7 +2392,14 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
}
// Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will
// make the RangeSet recognize the "continuity" between adjacent Positions.
Position previousPosition = ledger.getPreviousPosition(position);
// Before https://github.com/apache/pulsar/pull/21105 is merged, the range does not support crossing
// multi ledgers, so the first position's entryId maybe "-1".
Position previousPosition;
if (position.getEntryId() == 0) {
previousPosition = PositionFactory.create(position.getLedgerId(), -1);
} else {
previousPosition = ledger.getPreviousPosition(position);
}
individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(),
previousPosition.getEntryId(), position.getLedgerId(), position.getEntryId());
MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this);
Expand Down Expand Up @@ -3225,10 +3248,21 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
.addAllProperties(buildPropertiesMap(mdEntry.properties));

Map<Long, long[]> internalRanges = null;
try {
internalRanges = individualDeletedMessages.toRanges(getConfig().getMaxUnackedRangesToPersist());
} catch (Exception e) {
log.warn("[{}]-{} Failed to serialize individualDeletedMessages", ledger.getName(), name, e);
/**
* Cursor will create the {@link #individualDeletedMessages} typed {@link LongPairRangeSet.DefaultRangeSet} if
* disabled the config {@link ManagedLedgerConfig#unackedRangesOpenCacheSetEnabled}.
* {@link LongPairRangeSet.DefaultRangeSet} never implemented the methods below:
* - {@link LongPairRangeSet#toRanges(int)}, which is used to serialize cursor metadata.
* - {@link LongPairRangeSet#build(Map)}, which is used to deserialize cursor metadata.
* Do not enable the feature that https://github.com/apache/pulsar/pull/9292 introduced, to avoid serialization
* and deserialization error.
*/
if (getConfig().isUnackedRangesOpenCacheSetEnabled() && getConfig().isPersistIndividualAckAsLongArray()) {
try {
internalRanges = individualDeletedMessages.toRanges(getConfig().getMaxUnackedRangesToPersist());
} catch (Exception e) {
log.warn("[{}]-{} Failed to serialize individualDeletedMessages", ledger.getName(), name, e);
}
}
if (internalRanges != null && !internalRanges.isEmpty()) {
piBuilder.addAllIndividualDeletedMessageRanges(buildLongPropertiesMap(internalRanges));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@
*/
package org.apache.bookkeeper.mledger.impl;

import static java.util.Objects.requireNonNull;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Range;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.apache.pulsar.common.util.collections.OpenLongPairRangeSet;
import org.roaringbitmap.RoaringBitSet;
Expand All @@ -40,7 +38,6 @@ public class RangeSetWrapper<T extends Comparable<T>> implements LongPairRangeSe

private final LongPairRangeSet<T> rangeSet;
private final LongPairConsumer<T> rangeConverter;
private final ManagedLedgerConfig config;
private final boolean enableMultiEntry;

/**
Expand All @@ -53,13 +50,19 @@ public class RangeSetWrapper<T extends Comparable<T>> implements LongPairRangeSe
public RangeSetWrapper(LongPairConsumer<T> rangeConverter,
RangeBoundConsumer<T> rangeBoundConsumer,
ManagedCursorImpl managedCursor) {
requireNonNull(managedCursor);
this.config = managedCursor.getManagedLedger().getConfig();
this(rangeConverter, rangeBoundConsumer, managedCursor.getConfig().isUnackedRangesOpenCacheSetEnabled(),
managedCursor.getConfig().isPersistentUnackedRangesWithMultipleEntriesEnabled());
}

public RangeSetWrapper(LongPairConsumer<T> rangeConverter,
RangeBoundConsumer<T> rangeBoundConsumer,
boolean unackedRangesOpenCacheSetEnabled,
boolean persistentUnackedRangesWithMultipleEntriesEnabled) {
this.rangeConverter = rangeConverter;
this.rangeSet = config.isUnackedRangesOpenCacheSetEnabled()
this.rangeSet = unackedRangesOpenCacheSetEnabled
? new OpenLongPairRangeSet<>(rangeConverter, RoaringBitSet::new)
: new LongPairRangeSet.DefaultRangeSet<>(rangeConverter, rangeBoundConsumer);
this.enableMultiEntry = config.isPersistentUnackedRangesWithMultipleEntriesEnabled();
this.enableMultiEntry = persistentUnackedRangesWithMultipleEntriesEnabled;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,27 @@

import static org.apache.pulsar.common.util.PortManager.releaseLockedPort;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperTestClient;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
Expand All @@ -49,18 +54,22 @@
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.util.ThrowableToStringUtil;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.awaitility.Awaitility;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import io.netty.buffer.ByteBuf;
import lombok.Cleanup;

@Slf4j
public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {

private final ObjectMapper jackson = new ObjectMapper();

public ManagedLedgerBkTest() {
super(2);
}
Expand Down Expand Up @@ -590,44 +599,114 @@ public void testPeriodicRollover() throws Exception {
Awaitility.await().until(() -> cursorImpl.getCursorLedger() != currentLedgerId);
}

@DataProvider(name = "unackedRangesOpenCacheSetEnabledPair")
public Object[][] unackedRangesOpenCacheSetEnabledPair() {
return new Object[][]{
{false, true},
{true, false},
{true, true},
{false, false}
};
}

/**
* This test validates that cursor serializes and deserializes individual-ack list from the bk-ledger.
*
* @throws Exception
*/
@Test
public void testUnackmessagesAndRecovery() throws Exception {
@Test(dataProvider = "unackedRangesOpenCacheSetEnabledPair")
public void testUnackmessagesAndRecoveryCompatibility(boolean enabled1, boolean enabled2) throws Exception {
final String mlName = "ml" + UUID.randomUUID().toString().replaceAll("-", "");
final String cursorName = "c1";
ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
factoryConf.setMaxCacheSize(0);

ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);

ManagedLedgerConfig config = new ManagedLedgerConfig().setEnsembleSize(1).setWriteQuorumSize(1)
final ManagedLedgerConfig config1 = new ManagedLedgerConfig().setEnsembleSize(1).setWriteQuorumSize(1)
.setAckQuorumSize(1).setMetadataEnsembleSize(1).setMetadataWriteQuorumSize(1)
.setMaxUnackedRangesToPersistInMetadataStore(1).setMaxEntriesPerLedger(5).setMetadataAckQuorumSize(1)
.setUnackedRangesOpenCacheSetEnabled(enabled1);
final ManagedLedgerConfig config2 = new ManagedLedgerConfig().setEnsembleSize(1).setWriteQuorumSize(1)
.setAckQuorumSize(1).setMetadataEnsembleSize(1).setMetadataWriteQuorumSize(1)
.setMaxUnackedRangesToPersistInMetadataStore(1).setMaxEntriesPerLedger(5).setMetadataAckQuorumSize(1);
ManagedLedger ledger = factory.open("my_test_unack_messages", config);
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1");
.setMaxUnackedRangesToPersistInMetadataStore(1).setMaxEntriesPerLedger(5).setMetadataAckQuorumSize(1)
.setUnackedRangesOpenCacheSetEnabled(enabled2);

ManagedLedger ledger1 = factory.open(mlName, config1);
ManagedCursorImpl cursor1 = (ManagedCursorImpl) ledger1.openCursor(cursorName);

int totalEntries = 100;
for (int i = 0; i < totalEntries; i++) {
Position p = ledger.addEntry("entry".getBytes());
Position p = ledger1.addEntry("entry".getBytes());
if (i % 2 == 0) {
cursor.delete(p);
cursor1.delete(p);
}
}
log.info("ack ranges: {}", cursor1.getIndividuallyDeletedMessagesSet().size());

LongPairRangeSet<Position> unackMessagesBefore = cursor.getIndividuallyDeletedMessagesSet();
// reopen and recover cursor
ledger1.close();
ManagedLedger ledger2 = factory.open(mlName, config2);
ManagedCursorImpl cursor2 = (ManagedCursorImpl) ledger2.openCursor(cursorName);

ledger.close();
log.info("before: {}", cursor1.getIndividuallyDeletedMessagesSet().asRanges());
log.info("after : {}", cursor2.getIndividuallyDeletedMessagesSet().asRanges());
assertEquals(cursor1.getIndividuallyDeletedMessagesSet().asRanges(), cursor2.getIndividuallyDeletedMessagesSet().asRanges());
assertEquals(cursor1.markDeletePosition, cursor2.markDeletePosition);

ledger2.close();
factory.shutdown();
}

// open and recover cursor
ledger = factory.open("my_test_unack_messages", config);
cursor = (ManagedCursorImpl) ledger.openCursor("c1");
@DataProvider(name = "booleans")
public Object[][] booleans() {
return new Object[][] {
{true},
{false},
};
}

LongPairRangeSet<Position> unackMessagesAfter = cursor.getIndividuallyDeletedMessagesSet();
assertTrue(unackMessagesBefore.equals(unackMessagesAfter));
@Test(dataProvider = "booleans")
public void testConfigPersistIndividualAckAsLongArray(boolean enable) throws Exception {
final String mlName = "ml" + UUID.randomUUID().toString().replaceAll("-", "");
final String cursorName = "c1";
ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
final ManagedLedgerConfig config = new ManagedLedgerConfig()
.setEnsembleSize(1).setWriteQuorumSize(1).setAckQuorumSize(1)
.setMetadataEnsembleSize(1).setMetadataWriteQuorumSize(1).setMetadataAckQuorumSize(1)
.setMaxUnackedRangesToPersistInMetadataStore(1)
.setUnackedRangesOpenCacheSetEnabled(true).setPersistIndividualAckAsLongArray(enable);

ledger.close();
ManagedLedger ledger1 = factory.open(mlName, config);
ManagedCursorImpl cursor1 = (ManagedCursorImpl) ledger1.openCursor(cursorName);

// Write entries.
int totalEntries = 100;
List<Position> entries = new ArrayList<>();
for (int i = 0; i < totalEntries; i++) {
Position p = ledger1.addEntry("entry".getBytes());
entries.add(p);
}
// Make ack holes and trigger a mark deletion.
for (int i = totalEntries - 1; i >=0 ; i--) {
if (i % 2 == 0) {
cursor1.delete(entries.get(i));
}
}
cursor1.markDelete(entries.get(9));
Awaitility.await().untilAsserted(() -> {
assertEquals(cursor1.pendingMarkDeleteOps.size(), 0);
});

// Verify: the config affects.
long cursorLedgerLac = cursor1.cursorLedger.getLastAddConfirmed();
LedgerEntry ledgerEntry = cursor1.cursorLedger.readEntries(cursorLedgerLac, cursorLedgerLac).nextElement();
MLDataFormats.PositionInfo positionInfo = MLDataFormats.PositionInfo.parseFrom(ledgerEntry.getEntry());
if (enable) {
assertNotEquals(positionInfo.getIndividualDeletedMessageRangesList().size(), 0);
} else {
assertEquals(positionInfo.getIndividualDeletedMessageRangesList().size(), 0);
}

// cleanup
ledger1.close();
factory.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2266,6 +2266,10 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
+ " will only be tracked in memory and messages will be redelivered in case of"
+ " crashes.")
private int managedLedgerMaxUnackedRangesToPersist = 10000;
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "Whether persist cursor ack stats as long arrays, which will compress the data and reduce GC rate")
private boolean managedLedgerPersistIndividualAckAsLongArray = false;
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "If enabled, the maximum \"acknowledgment holes\" will not be limited and \"acknowledgment holes\" "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2002,6 +2002,8 @@ private CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull T

managedLedgerConfig
.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
managedLedgerConfig
.setPersistIndividualAckAsLongArray(serviceConfig.isManagedLedgerPersistIndividualAckAsLongArray());
managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled(
serviceConfig.isPersistentUnackedRangesWithMultipleEntriesEnabled());
managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore(
Expand Down
Loading

0 comments on commit 4ee4633

Please sign in to comment.