Skip to content

Commit

Permalink
add code.
Browse files Browse the repository at this point in the history
  • Loading branch information
thetumbled committed Nov 19, 2024
1 parent b8af7ea commit 932b841
Showing 1 changed file with 50 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@

import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import it.unimi.dsi.fastutil.longs.LongSet;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
Expand Down Expand Up @@ -121,19 +124,9 @@ public boolean addMessage(long ledgerId, long entryId, long deliverAt) {
}

long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt);
if (priorityQueue.containsKey(timestamp)) {
Long2ObjectMap<Roaring64Bitmap> ledgerMap = priorityQueue.get(timestamp);
Roaring64Bitmap entryIds = ledgerMap.computeIfAbsent(ledgerId, k -> new Roaring64Bitmap());
entryIds.add(entryId);
} else {
Roaring64Bitmap entryIds = new Roaring64Bitmap();
entryIds.add(entryId);
Long2ObjectMap<Roaring64Bitmap> ledgerMap = new Long2ObjectAVLTreeMap<>();
ledgerMap.put(ledgerId, entryIds);
priorityQueue.put(timestamp, ledgerMap);
}

priorityQueue.add(deliverAt, ledgerId, entryId);
priorityQueue.computeIfAbsent(timestamp, k -> new Long2ObjectOpenHashMap<>())
.computeIfAbsent(ledgerId, k -> new Roaring64Bitmap())
.add(entryId);
updateTimer();

checkAndUpdateHighest(deliverAt);
Expand All @@ -158,7 +151,7 @@ private void checkAndUpdateHighest(long deliverAt) {
*/
@Override
public boolean hasMessageAvailable() {
boolean hasMessageAvailable = !priorityQueue.isEmpty() && priorityQueue.peekN1() <= getCutoffTime();
boolean hasMessageAvailable = !priorityQueue.isEmpty() && priorityQueue.firstLongKey() <= getCutoffTime();
if (!hasMessageAvailable) {
updateTimer();
}
Expand All @@ -175,17 +168,40 @@ public NavigableSet<Position> getScheduledMessages(int maxMessages) {
long cutoffTime = getCutoffTime();

while (n > 0 && !priorityQueue.isEmpty()) {
long timestamp = priorityQueue.peekN1();
long timestamp = priorityQueue.firstLongKey();
if (timestamp > cutoffTime) {
break;
}

long ledgerId = priorityQueue.peekN2();
long entryId = priorityQueue.peekN3();
positions.add(PositionFactory.create(ledgerId, entryId));

priorityQueue.pop();
--n;
LongSet ledgerIdToDelete = new LongOpenHashSet();
Long2ObjectMap<Roaring64Bitmap> ledgerMap = priorityQueue.get(timestamp);
for (Long2ObjectMap.Entry<Roaring64Bitmap> ledgerEntry : ledgerMap.long2ObjectEntrySet()) {
long ledgerId = ledgerEntry.getLongKey();
Roaring64Bitmap entryIds = ledgerEntry.getValue();
if (entryIds.getLongCardinality() <= n) {
entryIds.forEach(entryId -> {
positions.add(PositionFactory.create(ledgerId, entryId));
});
n -= (int) entryIds.getLongCardinality();
ledgerIdToDelete.add(ledgerId);
} else {
long[] entryIdsArray = entryIds.toArray();
for (int i = 0; i < n; i++) {
positions.add(PositionFactory.create(ledgerId, entryIdsArray[i]));
entryIds.removeLong(entryIdsArray[i]);
}
n = 0;
}
if (n <= 0) {
break;
}
}
for (long ledgerId : ledgerIdToDelete) {
ledgerMap.remove(ledgerId);
}
if (ledgerMap.isEmpty()) {
priorityQueue.remove(timestamp);
}
}

if (log.isDebugEnabled()) {
Expand All @@ -210,18 +226,27 @@ public CompletableFuture<Void> clear() {

@Override
public long getNumberOfDelayedMessages() {
return priorityQueue.size();
return priorityQueue.values().stream().mapToLong(
ledgerMap -> ledgerMap.values().stream().mapToLong(
Roaring64Bitmap::getLongCardinality).sum()).sum();
}

/**
* This method rely on Roaring64Bitmap::getLongSizeInBytes to calculate the memory usage of the buffer.
* The memory usage of the buffer is not accurate, because Roaring64Bitmap::getLongSizeInBytes will
* overestimate the memory usage of the buffer a lot.
* @return the memory usage of the buffer
*/
@Override
public long getBufferMemoryUsage() {
return priorityQueue.bytesCapacity();
return priorityQueue.values().stream().mapToLong(
ledgerMap -> ledgerMap.values().stream().mapToLong(
Roaring64Bitmap::getLongSizeInBytes).sum()).sum();
}

@Override
public void close() {
super.close();
priorityQueue.close();
}

@Override
Expand All @@ -234,6 +259,6 @@ && getNumberOfDelayedMessages() >= fixedDelayDetectionLookahead
}

protected long nextDeliveryTime() {
return priorityQueue.peekN1();
return priorityQueue.firstLongKey();
}
}

0 comments on commit 932b841

Please sign in to comment.