Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single buffer for small add requests #3783

Merged
merged 17 commits into from
Mar 20, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import static org.apache.bookkeeper.replication.ReplicationStats.WRITE_DATA_LATENCY;

import com.google.common.util.concurrent.RateLimiter;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCounted;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -403,17 +405,24 @@ public void readComplete(int rc, LedgerHandle lh,
numEntriesRead.inc();
numBytesRead.registerSuccessfulValue(dataLength);

ByteBufList toSend = lh.getDigestManager()
ReferenceCounted toSend = lh.getDigestManager()
.computeDigestAndPackageForSending(entryId,
lh.getLastAddConfirmed(), entry.getLength(),
Unpooled.wrappedBuffer(data, 0, data.length));
Unpooled.wrappedBuffer(data, 0, data.length),
lh.getLedgerKey(),
0
);
if (replicationThrottle != null) {
updateAverageEntrySize(toSend.readableBytes());
if (toSend instanceof ByteBuf) {
updateAverageEntrySize(((ByteBuf) toSend).readableBytes());
} else if (toSend instanceof ByteBufList) {
updateAverageEntrySize(((ByteBufList) toSend).readableBytes());
}
}
for (BookieId newBookie : newBookies) {
long startWriteEntryTime = MathUtils.nowInNano();
bkc.getBookieClient().addEntry(newBookie, lh.getId(),
lh.getLedgerKey(), entryId, ByteBufList.clone(toSend),
lh.getLedgerKey(), entryId, toSend,
multiWriteCallback, dataLength, BookieProtocol.FLAG_RECOVERY_ADD,
false, WriteFlag.NONE);
writeDataLatency.registerSuccessfulEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
Expand All @@ -38,7 +39,6 @@
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -56,7 +56,7 @@ class PendingAddOp implements Runnable, WriteCallback {
private static final Logger LOG = LoggerFactory.getLogger(PendingAddOp.class);

ByteBuf payload;
ByteBufList toSend;
ReferenceCounted toSend;
AddCallbackWithLatency cb;
Object ctx;
long entryId;
Expand Down Expand Up @@ -257,9 +257,10 @@ public void run() {
checkNotNull(lh);
checkNotNull(lh.macManager);

int flags = isRecoveryAdd ? FLAG_RECOVERY_ADD | FLAG_HIGH_PRIORITY : FLAG_NONE;
this.toSend = lh.macManager.computeDigestAndPackageForSending(
entryId, lh.lastAddConfirmed, currentLedgerLength,
payload);
payload, lh.ledgerKey, flags);
// ownership of RefCounted ByteBuf was passed to computeDigestAndPackageForSending
payload = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.bookkeeper.auth.AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME;

import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -39,6 +40,7 @@
import org.apache.bookkeeper.auth.ClientAuthProvider;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
import org.apache.bookkeeper.util.ByteBufList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -358,8 +360,10 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
} else {
waitingForAuth.add(msg);
}
} else if (msg instanceof ByteBuf || msg instanceof ByteBufList) {
waitingForAuth.add(msg);
} else {
LOG.info("dropping write of message {}", msg);
LOG.info("[{}] dropping write of message {}", ctx.channel(), msg);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/
package org.apache.bookkeeper.proto;

import io.netty.util.ReferenceCounted;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -139,7 +140,7 @@ void writeLac(BookieId address, long ledgerId, byte[] masterKey,
* {@link org.apache.bookkeeper.client.api.WriteFlag}
*/
void addEntry(BookieId address, long ledgerId, byte[] masterKey,
long entryId, ByteBufList toSend, WriteCallback cb, Object ctx,
long entryId, ReferenceCounted toSend, WriteCallback cb, Object ctx,
int options, boolean allowFastFail, EnumSet<WriteFlag> writeFlags);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.util.EnumSet;
Expand Down Expand Up @@ -301,7 +302,7 @@ public void addEntry(final BookieId addr,
final long ledgerId,
final byte[] masterKey,
final long entryId,
final ByteBufList toSend,
final ReferenceCounted toSend,
final WriteCallback cb,
final Object ctx,
final int options,
Expand Down Expand Up @@ -370,7 +371,7 @@ private static class ChannelReadyForAddEntryCallback
private final Handle<ChannelReadyForAddEntryCallback> recyclerHandle;

private BookieClientImpl bookieClient;
private ByteBufList toSend;
private ReferenceCounted toSend;
private long ledgerId;
private long entryId;
private BookieId addr;
Expand All @@ -382,7 +383,7 @@ private static class ChannelReadyForAddEntryCallback
private EnumSet<WriteFlag> writeFlags;

static ChannelReadyForAddEntryCallback create(
BookieClientImpl bookieClient, ByteBufList toSend, long ledgerId,
BookieClientImpl bookieClient, ReferenceCounted toSend, long ledgerId,
long entryId, BookieId addr, Object ctx,
WriteCallback cb, int options, byte[] masterKey, boolean allowFastFail,
EnumSet<WriteFlag> writeFlags) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,9 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
if (LOG.isTraceEnabled()) {
LOG.trace("Encode request {} to channel {}.", msg, ctx.channel());
}
if (msg instanceof BookkeeperProtocol.Request) {
if (msg instanceof ByteBuf || msg instanceof ByteBufList) {
ctx.write(msg, promise);
} else if (msg instanceof BookkeeperProtocol.Request) {
ctx.write(reqV3.encode(msg, ctx.alloc()), promise);
} else if (msg instanceof BookieProtocol.Request) {
ctx.write(reqPreV3.encode(msg, ctx.alloc()), promise);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
Expand Down Expand Up @@ -771,7 +772,7 @@ void forceLedger(final long ledgerId, ForceLedgerCallback cb, Object ctx) {
* @param writeFlags
* WriteFlags
*/
void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBufList toSend, WriteCallback cb,
void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ReferenceCounted toSend, WriteCallback cb,
Object ctx, final int options, boolean allowFastFail, final EnumSet<WriteFlag> writeFlags) {
Object request = null;
CompletionKey completionKey = null;
Expand All @@ -784,9 +785,12 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBuf
return;
}
completionKey = acquireV2Key(ledgerId, entryId, OperationType.ADD_ENTRY);
request = BookieProtocol.AddRequest.create(
BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
(short) options, masterKey, toSend);

if (toSend instanceof ByteBuf) {
request = ((ByteBuf) toSend).retainedDuplicate();
} else {
request = ByteBufList.clone((ByteBufList) toSend);
}
} else {
final long txnId = getTxnId();
completionKey = new V3CompletionKey(txnId, OperationType.ADD_ENTRY);
Expand All @@ -801,11 +805,14 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBuf
}

ByteString body = null;
if (toSend.hasArray()) {
body = UnsafeByteOperations.unsafeWrap(toSend.array(), toSend.arrayOffset(), toSend.readableBytes());
ByteBufList bufToSend = (ByteBufList) toSend;

if (bufToSend.hasArray()) {
body = UnsafeByteOperations.unsafeWrap(bufToSend.array(), bufToSend.arrayOffset(),
bufToSend.readableBytes());
} else {
for (int i = 0; i < toSend.size(); i++) {
ByteString piece = UnsafeByteOperations.unsafeWrap(toSend.getBuffer(i).nioBuffer());
for (int i = 0; i < bufToSend.size(); i++) {
ByteString piece = UnsafeByteOperations.unsafeWrap(bufToSend.getBuffer(i).nioBuffer());
// use ByteString.concat to avoid byte[] allocation when toSend has multiple ByteBufs
body = (body == null) ? piece : body.concat(piece);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import java.security.GeneralSecurityException;
import java.security.NoSuchAlgorithmException;
import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.proto.BookieProtoEncoding;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
import org.apache.bookkeeper.util.ByteBufList;
import org.slf4j.Logger;
Expand Down Expand Up @@ -96,14 +99,61 @@ public static byte[] generateMasterKey(byte[] password) throws NoSuchAlgorithmEx
* @param data
* @return
*/
public ByteBufList computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, long length,
ByteBuf data) {
ByteBuf headersBuffer;
public ReferenceCounted computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, long length,
ByteBuf data, byte[] masterKey, int flags) {
if (this.useV2Protocol) {
headersBuffer = allocator.buffer(METADATA_LENGTH + macCodeLength);
return computeDigestAndPackageForSendingV2(entryId, lastAddConfirmed, length, data, masterKey, flags);
} else {
headersBuffer = Unpooled.buffer(METADATA_LENGTH + macCodeLength);
return computeDigestAndPackageForSendingV3(entryId, lastAddConfirmed, length, data);
}
}

private ReferenceCounted computeDigestAndPackageForSendingV2(long entryId, long lastAddConfirmed, long length,
ByteBuf data, byte[] masterKey, int flags) {
boolean isSmallEntry = data.readableBytes() < BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD;

int headersSize = 4 // Request header
+ BookieProtocol.MASTER_KEY_LENGTH // for the master key
+ METADATA_LENGTH //
+ macCodeLength;
int payloadSize = data.readableBytes();
int bufferSize = 4 + headersSize + (isSmallEntry ? payloadSize : 0);

ByteBuf buf = allocator.buffer(bufferSize, bufferSize);
buf.writeInt(headersSize + payloadSize);
buf.writeInt(
BookieProtocol.PacketHeader.toInt(
BookieProtocol.CURRENT_PROTOCOL_VERSION, BookieProtocol.ADDENTRY, (short) flags));
buf.writeBytes(masterKey, 0, BookieProtocol.MASTER_KEY_LENGTH);

// The checksum is computed on the next part of the buffer only
buf.readerIndex(buf.writerIndex());
buf.writeLong(ledgerId);
buf.writeLong(entryId);
buf.writeLong(lastAddConfirmed);
buf.writeLong(length);

// Compute checksum over the headers
update(buf);
update(data);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to unwrap the data ByteBuf if it is CompositeByteBuf?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for v2 protocol, we shouldn't be getting any ComposityByteBuf

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When Pulsar enabled KOP AppendIndexMetadataInterceptor, the payload will be CompositeByteBuf, and the update(data) will copy data to the heap. Please refer to: #2701


populateValueAndReset(buf);

// Reset the reader index to the beginning
buf.readerIndex(0);

if (isSmallEntry) {
buf.writeBytes(data);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buf.writeBytes(unwrap) ?

data.release();
return buf;
} else {
return ByteBufList.get(buf, data);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ByteBufList.get(buf, unwrap)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
}

private ByteBufList computeDigestAndPackageForSendingV3(long entryId, long lastAddConfirmed, long length,
ByteBuf data) {
ByteBuf headersBuffer = Unpooled.buffer(METADATA_LENGTH + macCodeLength);
headersBuffer.writeLong(ledgerId);
headersBuffer.writeLong(entryId);
headersBuffer.writeLong(lastAddConfirmed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
import org.apache.bookkeeper.proto.MockBookieClient;
import org.apache.bookkeeper.proto.checksum.DigestManager;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.versioning.Versioned;

/**
Expand All @@ -48,8 +48,8 @@ public static ByteBuf generatePacket(long ledgerId, long entryId, long lastAddCo
int offset, int len) throws GeneralSecurityException {
DigestManager dm = DigestManager.instantiate(ledgerId, new byte[2], DigestType.CRC32,
UnpooledByteBufAllocator.DEFAULT, true);
return ByteBufList.coalesce(dm.computeDigestAndPackageForSending(entryId, lastAddConfirmed, length,
Unpooled.wrappedBuffer(data, offset, len)));
return MockBookieClient.copyData(dm.computeDigestAndPackageForSending(entryId, lastAddConfirmed, length,
Unpooled.wrappedBuffer(data, offset, len), new byte[20], 0));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@
package org.apache.bookkeeper.client;

import io.netty.buffer.ByteBuf;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.proto.MockBookieClient;

/**
* Adapter for tests to get the public access from LedgerHandle for its default
* scope.
*/
public class LedgerHandleAdapter {

public static ByteBufList toSend(LedgerHandle lh, long entryId, ByteBuf data) {
return lh.getDigestManager().computeDigestAndPackageForSending(entryId, lh.getLastAddConfirmed(),
lh.addToLength(data.readableBytes()), data);
public static ByteBuf toSend(LedgerHandle lh, long entryId, ByteBuf data) {
return MockBookieClient.copyData(lh.getDigestManager()
.computeDigestAndPackageForSending(entryId, lh.getLastAddConfirmed(),
lh.addToLength(data.readableBytes()), data, new byte[20], 0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.util.ReferenceCounted;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -67,6 +68,7 @@
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.proto.MockBookieClient;
import org.apache.bookkeeper.proto.checksum.DigestManager;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.ByteBufList;
Expand Down Expand Up @@ -504,10 +506,10 @@ protected void setupBookieClientReadEntry() {

if (mockEntry != null) {
LOG.info("readEntry - found mock entry {}@{} at {}", entryId, ledgerId, bookieSocketAddress);
ByteBufList entry = macManager.computeDigestAndPackageForSending(entryId,
ReferenceCounted entry = macManager.computeDigestAndPackageForSending(entryId,
mockEntry.lastAddConfirmed, mockEntry.payload.length,
Unpooled.wrappedBuffer(mockEntry.payload));
callback.readEntryComplete(BKException.Code.OK, ledgerId, entryId, ByteBufList.coalesce(entry),
Unpooled.wrappedBuffer(mockEntry.payload), new byte[20], 0);
callback.readEntryComplete(BKException.Code.OK, ledgerId, entryId, MockBookieClient.copyData(entry),
args[4]);
entry.release();
} else {
Expand Down
Loading