Skip to content

Commit

Permalink
add code.
Browse files Browse the repository at this point in the history
  • Loading branch information
thetumbled committed Nov 19, 2024
1 parent 2715853 commit b8af7ea
Showing 1 changed file with 54 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,24 @@
import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;

import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
import org.roaringbitmap.longlong.Roaring64Bitmap;

@Slf4j
public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker {

protected final TripleLongPriorityQueue priorityQueue = new TripleLongPriorityQueue();
// timestamp -> ledgerId -> entryId
protected final Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>> priorityQueue
= new Long2ObjectAVLTreeMap<>();

// If we detect that all messages have fixed delay time, such that the delivery is
// always going to be in FIFO order, then we can avoid pulling all the messages in
Expand All @@ -52,12 +59,16 @@ public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTrack
// Track whether we have seen all messages with fixed delay so far.
private boolean messagesHaveFixedDelay = true;

//
private int timestampPrecisionBitCnt = 8;

InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, Timer timer,
long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict,
long fixedDelayDetectionLookahead) {
this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict,
fixedDelayDetectionLookahead);
this.timestampPrecisionBitCnt = calculateTimestampPrecisionBitCnt(tickTimeMillis);
}

public InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, Timer timer,
Expand All @@ -66,6 +77,35 @@ public InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsum
long fixedDelayDetectionLookahead) {
super(dispatcher, timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict);
this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
this.timestampPrecisionBitCnt = calculateTimestampPrecisionBitCnt(tickTimeMillis);
}

/**
* The tick time is used to determine the precision of the delivery time. As the redelivery time
* is not accurate, we can bucket the delivery time and group multiple message ids into the same
* bucket to reduce the memory usage. THe default value is 1 second, which means we accept 1 second
* deviation for the delivery time, so that we can trim the lower 9 bits of the delivery time, because
* 2**9ms = 512ms < 1s, 2**10ms = 1024ms > 1s.
* @param tickTimeMillis
* @return
*/
private static int calculateTimestampPrecisionBitCnt(long tickTimeMillis) {
int bitCnt = 0;
while (tickTimeMillis > 0) {
tickTimeMillis >>= 1;
bitCnt++;
}
return bitCnt-1;
}

/**
* trim the lower bits of the timestamp to reduce the memory usage.
* @param timestamp
* @param bits
* @return
*/
private static long trimLowerBit(long timestamp, int bits) {
return timestamp & (-1L << bits);
}

@Override
Expand All @@ -80,6 +120,19 @@ public boolean addMessage(long ledgerId, long entryId, long deliverAt) {
deliverAt - clock.millis());
}

long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt);
if (priorityQueue.containsKey(timestamp)) {
Long2ObjectMap<Roaring64Bitmap> ledgerMap = priorityQueue.get(timestamp);
Roaring64Bitmap entryIds = ledgerMap.computeIfAbsent(ledgerId, k -> new Roaring64Bitmap());
entryIds.add(entryId);
} else {
Roaring64Bitmap entryIds = new Roaring64Bitmap();
entryIds.add(entryId);
Long2ObjectMap<Roaring64Bitmap> ledgerMap = new Long2ObjectAVLTreeMap<>();
ledgerMap.put(ledgerId, entryIds);
priorityQueue.put(timestamp, ledgerMap);
}

priorityQueue.add(deliverAt, ledgerId, entryId);
updateTimer();

Expand Down

0 comments on commit b8af7ea

Please sign in to comment.