From 11d5854c18fd91602a0c9a555de3fce66752f545 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 19 Oct 2022 09:32:58 -0700 Subject: [PATCH 01/13] Single buffer for small add requests --- .../client/LedgerFragmentReplicator.java | 17 ++++-- .../bookkeeper/client/PendingAddOp.java | 6 +- .../apache/bookkeeper/proto/AuthHandler.java | 6 +- .../apache/bookkeeper/proto/BookieClient.java | 3 +- .../bookkeeper/proto/BookieClientImpl.java | 7 ++- .../bookkeeper/proto/BookieProtoEncoding.java | 4 +- .../proto/PerChannelBookieClient.java | 25 +++++--- .../proto/checksum/DigestManager.java | 60 +++++++++++++++++-- .../apache/bookkeeper/client/ClientUtil.java | 6 +- .../client/LedgerHandleAdapter.java | 7 ++- .../client/MockBookKeeperTestCase.java | 8 ++- .../client/ParallelLedgerRecoveryTest.java | 6 +- .../ReadLastConfirmedAndEntryOpTest.java | 14 +++-- .../client/TestPendingReadLacOp.java | 11 +++- .../bookkeeper/proto/MockBookieClient.java | 14 +++-- .../apache/bookkeeper/proto/MockBookies.java | 4 +- 16 files changed, 147 insertions(+), 51 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java index 877a3ac300a..6b439b0960d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java @@ -29,7 +29,9 @@ import static org.apache.bookkeeper.replication.ReplicationStats.WRITE_DATA_LATENCY; import com.google.common.util.concurrent.RateLimiter; +import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCounted; import java.util.Enumeration; import java.util.HashSet; import java.util.Iterator; @@ -403,17 +405,24 @@ public void readComplete(int rc, LedgerHandle lh, numEntriesRead.inc(); numBytesRead.registerSuccessfulValue(dataLength); - ByteBufList toSend = lh.getDigestManager() + ReferenceCounted toSend = lh.getDigestManager() .computeDigestAndPackageForSending(entryId, lh.getLastAddConfirmed(), entry.getLength(), - Unpooled.wrappedBuffer(data, 0, data.length)); + Unpooled.wrappedBuffer(data, 0, data.length), + lh.getLedgerKey(), + 0 + ); if (replicationThrottle != null) { - updateAverageEntrySize(toSend.readableBytes()); + if (toSend instanceof ByteBuf) { + updateAverageEntrySize(((ByteBuf) toSend).readableBytes()); + } else if (toSend instanceof ByteBufList) { + updateAverageEntrySize(((ByteBufList) toSend).readableBytes()); + } } for (BookieId newBookie : newBookies) { long startWriteEntryTime = MathUtils.nowInNano(); bkc.getBookieClient().addEntry(newBookie, lh.getId(), - lh.getLedgerKey(), entryId, ByteBufList.clone(toSend), + lh.getLedgerKey(), entryId, toSend, multiWriteCallback, dataLength, BookieProtocol.FLAG_RECOVERY_ADD, false, WriteFlag.NONE); writeDataLatency.registerSuccessfulEvent( diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java index d0ff59e45c6..87954179b0b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java @@ -27,6 +27,7 @@ import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCountUtil; +import io.netty.util.ReferenceCounted; import java.util.EnumSet; import java.util.HashSet; import java.util.List; @@ -56,7 +57,7 @@ class PendingAddOp implements Runnable, WriteCallback { private static final Logger LOG = LoggerFactory.getLogger(PendingAddOp.class); ByteBuf payload; - ByteBufList toSend; + ReferenceCounted toSend; AddCallbackWithLatency cb; Object ctx; long entryId; @@ -257,9 +258,10 @@ public void run() { checkNotNull(lh); checkNotNull(lh.macManager); + int flags = isRecoveryAdd ? FLAG_RECOVERY_ADD | FLAG_HIGH_PRIORITY : FLAG_NONE; this.toSend = lh.macManager.computeDigestAndPackageForSending( entryId, lh.lastAddConfirmed, currentLedgerLength, - payload); + payload, lh.ledgerKey, flags); // ownership of RefCounted ByteBuf was passed to computeDigestAndPackageForSending payload = null; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java index ba5dc9948dc..f923b61ad50 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java @@ -23,6 +23,7 @@ import static org.apache.bookkeeper.auth.AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME; import com.google.protobuf.ByteString; +import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; @@ -39,6 +40,7 @@ import org.apache.bookkeeper.auth.ClientAuthProvider; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage; +import org.apache.bookkeeper.util.ByteBufList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -358,8 +360,10 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) } else { waitingForAuth.add(msg); } + } else if (msg instanceof ByteBuf || msg instanceof ByteBufList) { + waitingForAuth.add(msg); } else { - LOG.info("dropping write of message {}", msg); + LOG.info("[{}] dropping write of message {}", ctx.channel(), msg); } } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java index 81be386f7ca..938874fac04 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java @@ -20,6 +20,7 @@ */ package org.apache.bookkeeper.proto; +import io.netty.util.ReferenceCounted; import java.util.EnumSet; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -139,7 +140,7 @@ void writeLac(BookieId address, long ledgerId, byte[] masterKey, * {@link org.apache.bookkeeper.client.api.WriteFlag} */ void addEntry(BookieId address, long ledgerId, byte[] masterKey, - long entryId, ByteBufList toSend, WriteCallback cb, Object ctx, + long entryId, ReferenceCounted toSend, WriteCallback cb, Object ctx, int options, boolean allowFastFail, EnumSet writeFlags); /** diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java index eaecb452820..8b5984a728b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java @@ -32,6 +32,7 @@ import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCountUtil; +import io.netty.util.ReferenceCounted; import io.netty.util.concurrent.DefaultThreadFactory; import java.io.IOException; import java.util.EnumSet; @@ -301,7 +302,7 @@ public void addEntry(final BookieId addr, final long ledgerId, final byte[] masterKey, final long entryId, - final ByteBufList toSend, + final ReferenceCounted toSend, final WriteCallback cb, final Object ctx, final int options, @@ -370,7 +371,7 @@ private static class ChannelReadyForAddEntryCallback private final Handle recyclerHandle; private BookieClientImpl bookieClient; - private ByteBufList toSend; + private ReferenceCounted toSend; private long ledgerId; private long entryId; private BookieId addr; @@ -382,7 +383,7 @@ private static class ChannelReadyForAddEntryCallback private EnumSet writeFlags; static ChannelReadyForAddEntryCallback create( - BookieClientImpl bookieClient, ByteBufList toSend, long ledgerId, + BookieClientImpl bookieClient, ReferenceCounted toSend, long ledgerId, long entryId, BookieId addr, Object ctx, WriteCallback cb, int options, byte[] masterKey, boolean allowFastFail, EnumSet writeFlags) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java index 3e41a3f5ea7..6d4632471d1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java @@ -428,7 +428,9 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) if (LOG.isTraceEnabled()) { LOG.trace("Encode request {} to channel {}.", msg, ctx.channel()); } - if (msg instanceof BookkeeperProtocol.Request) { + if (msg instanceof ByteBuf || msg instanceof ByteBufList) { + ctx.write(msg, promise); + } else if (msg instanceof BookkeeperProtocol.Request) { ctx.write(reqV3.encode(msg, ctx.alloc()), promise); } else if (msg instanceof BookieProtocol.Request) { ctx.write(reqPreV3.encode(msg, ctx.alloc()), promise); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 888077fe80b..a4a1e331aaf 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -21,6 +21,7 @@ import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID; import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import com.google.protobuf.ByteString; import com.google.protobuf.ExtensionRegistry; @@ -62,6 +63,7 @@ import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCountUtil; +import io.netty.util.ReferenceCounted; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import java.io.IOException; @@ -771,7 +773,7 @@ void forceLedger(final long ledgerId, ForceLedgerCallback cb, Object ctx) { * @param writeFlags * WriteFlags */ - void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBufList toSend, WriteCallback cb, + void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ReferenceCounted toSend, WriteCallback cb, Object ctx, final int options, boolean allowFastFail, final EnumSet writeFlags) { Object request = null; CompletionKey completionKey = null; @@ -784,9 +786,12 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBuf return; } completionKey = acquireV2Key(ledgerId, entryId, OperationType.ADD_ENTRY); - request = BookieProtocol.AddRequest.create( - BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId, - (short) options, masterKey, toSend); + + if (toSend instanceof ByteBuf) { + request = ((ByteBuf) toSend).retainedDuplicate(); + } else { + request = ByteBufList.clone((ByteBufList) toSend); + } } else { final long txnId = getTxnId(); completionKey = new V3CompletionKey(txnId, OperationType.ADD_ENTRY); @@ -801,11 +806,15 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBuf } ByteString body = null; - if (toSend.hasArray()) { - body = UnsafeByteOperations.unsafeWrap(toSend.array(), toSend.arrayOffset(), toSend.readableBytes()); + Preconditions.checkArgument(toSend instanceof ByteBufList); + ByteBufList bufToSend = (ByteBufList) toSend; + + if (bufToSend.hasArray()) { + body = UnsafeByteOperations.unsafeWrap(bufToSend.array(), bufToSend.arrayOffset(), + bufToSend.readableBytes()); } else { - for (int i = 0; i < toSend.size(); i++) { - ByteString piece = UnsafeByteOperations.unsafeWrap(toSend.getBuffer(i).nioBuffer()); + for (int i = 0; i < bufToSend.size(); i++) { + ByteString piece = UnsafeByteOperations.unsafeWrap(bufToSend.getBuffer(i).nioBuffer()); // use ByteString.concat to avoid byte[] allocation when toSend has multiple ByteBufs body = (body == null) ? piece : body.concat(piece); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java index 7b0f57a945a..d389a9c69c9 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java @@ -22,10 +22,13 @@ import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.util.ReferenceCountUtil; +import io.netty.util.ReferenceCounted; import java.security.GeneralSecurityException; import java.security.NoSuchAlgorithmException; import org.apache.bookkeeper.client.BKException.BKDigestMatchException; import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.proto.BookieProtoEncoding; +import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType; import org.apache.bookkeeper.util.ByteBufList; import org.slf4j.Logger; @@ -96,14 +99,61 @@ public static byte[] generateMasterKey(byte[] password) throws NoSuchAlgorithmEx * @param data * @return */ - public ByteBufList computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, long length, - ByteBuf data) { - ByteBuf headersBuffer; + public ReferenceCounted computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, long length, + ByteBuf data, byte[] masterKey, int flags) { if (this.useV2Protocol) { - headersBuffer = allocator.buffer(METADATA_LENGTH + macCodeLength); + return computeDigestAndPackageForSendingV2(entryId, lastAddConfirmed, length, data, masterKey, flags); } else { - headersBuffer = Unpooled.buffer(METADATA_LENGTH + macCodeLength); + return computeDigestAndPackageForSendingV3(entryId, lastAddConfirmed, length, data); } + } + + private ReferenceCounted computeDigestAndPackageForSendingV2(long entryId, long lastAddConfirmed, long length, + ByteBuf data, byte[] masterKey, int flags) { + boolean isSmallEntry = data.readableBytes() < BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD; + + int headersSize = 4 // Request header + + BookieProtocol.MASTER_KEY_LENGTH // for the master key + + METADATA_LENGTH // + + macCodeLength; + int payloadSize = data.readableBytes(); + int bufferSize = 4 + headersSize + (isSmallEntry ? payloadSize : 0); + + ByteBuf buf = allocator.buffer(bufferSize, bufferSize); + buf.writeInt(headersSize + payloadSize); + buf.writeInt( + BookieProtocol.PacketHeader.toInt( + BookieProtocol.CURRENT_PROTOCOL_VERSION, BookieProtocol.ADDENTRY, (short) flags)); + buf.writeBytes(masterKey, 0, BookieProtocol.MASTER_KEY_LENGTH); + + // The checksum is computed on the next part of the buffer only + buf.readerIndex(buf.writerIndex()); + buf.writeLong(ledgerId); + buf.writeLong(entryId); + buf.writeLong(lastAddConfirmed); + buf.writeLong(length); + + // Compute checksum over the headers + update(buf); + update(data); + + populateValueAndReset(buf); + + // Reset the reader index to the beginning + buf.readerIndex(0); + + if (isSmallEntry) { + buf.writeBytes(data); + data.release(); + return buf; + } else { + return ByteBufList.get(buf, data); + } + } + + private ByteBufList computeDigestAndPackageForSendingV3(long entryId, long lastAddConfirmed, long length, + ByteBuf data) { + ByteBuf headersBuffer = Unpooled.buffer(METADATA_LENGTH + macCodeLength); headersBuffer.writeLong(ledgerId); headersBuffer.writeLong(entryId); headersBuffer.writeLong(lastAddConfirmed); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java index d24022e5042..8c1480bb27f 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java @@ -27,8 +27,8 @@ import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType; +import org.apache.bookkeeper.proto.MockBookieClient; import org.apache.bookkeeper.proto.checksum.DigestManager; -import org.apache.bookkeeper.util.ByteBufList; import org.apache.bookkeeper.versioning.Versioned; /** @@ -48,8 +48,8 @@ public static ByteBuf generatePacket(long ledgerId, long entryId, long lastAddCo int offset, int len) throws GeneralSecurityException { DigestManager dm = DigestManager.instantiate(ledgerId, new byte[2], DigestType.CRC32, UnpooledByteBufAllocator.DEFAULT, true); - return ByteBufList.coalesce(dm.computeDigestAndPackageForSending(entryId, lastAddConfirmed, length, - Unpooled.wrappedBuffer(data, offset, len))); + return MockBookieClient.copyData(dm.computeDigestAndPackageForSending(entryId, lastAddConfirmed, length, + Unpooled.wrappedBuffer(data, offset, len), new byte[20], 0)); } /** diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java index 7d5cad6531a..91c7e0f0784 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java @@ -20,6 +20,7 @@ package org.apache.bookkeeper.client; import io.netty.buffer.ByteBuf; +import org.apache.bookkeeper.proto.MockBookieClient; import org.apache.bookkeeper.util.ByteBufList; /** @@ -28,8 +29,8 @@ */ public class LedgerHandleAdapter { - public static ByteBufList toSend(LedgerHandle lh, long entryId, ByteBuf data) { - return lh.getDigestManager().computeDigestAndPackageForSending(entryId, lh.getLastAddConfirmed(), - lh.addToLength(data.readableBytes()), data); + public static ByteBuf toSend(LedgerHandle lh, long entryId, ByteBuf data) { + return MockBookieClient.copyData(lh.getDigestManager().computeDigestAndPackageForSending(entryId, lh.getLastAddConfirmed(), + lh.addToLength(data.readableBytes()), data, new byte[20], 0)); } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java index 63255310c30..e7710124707 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java @@ -35,6 +35,7 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.util.ReferenceCounted; import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.Collections; @@ -67,6 +68,7 @@ import org.apache.bookkeeper.proto.BookieClient; import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks; +import org.apache.bookkeeper.proto.MockBookieClient; import org.apache.bookkeeper.proto.checksum.DigestManager; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.util.ByteBufList; @@ -504,10 +506,10 @@ protected void setupBookieClientReadEntry() { if (mockEntry != null) { LOG.info("readEntry - found mock entry {}@{} at {}", entryId, ledgerId, bookieSocketAddress); - ByteBufList entry = macManager.computeDigestAndPackageForSending(entryId, + ReferenceCounted entry = macManager.computeDigestAndPackageForSending(entryId, mockEntry.lastAddConfirmed, mockEntry.payload.length, - Unpooled.wrappedBuffer(mockEntry.payload)); - callback.readEntryComplete(BKException.Code.OK, ledgerId, entryId, ByteBufList.coalesce(entry), + Unpooled.wrappedBuffer(mockEntry.payload), new byte[20], 0); + callback.readEntryComplete(BKException.Code.OK, ledgerId, entryId, MockBookieClient.copyData(entry), args[4]); entry.release(); } else { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java index 0fa2f0d7762..812fae88c0b 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java @@ -26,6 +26,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCounted; import java.io.IOException; import java.util.Enumeration; import java.util.concurrent.CompletableFuture; @@ -425,9 +426,10 @@ public void testRecoveryOnEntryGap() throws Exception { long entryId = 14; long lac = 8; byte[] data = "recovery-on-entry-gap-gap".getBytes(UTF_8); - ByteBufList toSend = + ReferenceCounted toSend = lh.macManager.computeDigestAndPackageForSending( - entryId, lac, lh.getLength() + 100, Unpooled.wrappedBuffer(data, 0, data.length)); + entryId, lac, lh.getLength() + 100, Unpooled.wrappedBuffer(data, 0, data.length), + new byte[20], 0); final CountDownLatch addLatch = new CountDownLatch(1); final AtomicBoolean addSuccess = new AtomicBoolean(false); LOG.info("Add entry {} with lac = {}", entryId, lac); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java index b538a50c2a0..8e3cfd72e42 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java @@ -33,6 +33,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.util.ReferenceCounted; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -166,10 +167,15 @@ public void testSpeculativeResponses() throws Exception { final long lac = 1L; ByteBuf data = Unpooled.copiedBuffer("test-speculative-responses", UTF_8); - ByteBufList dataWithDigest = digestManager.computeDigestAndPackageForSending( - entryId, lac, data.readableBytes(), data); - byte[] bytesWithDigest = new byte[dataWithDigest.readableBytes()]; - assertEquals(bytesWithDigest.length, dataWithDigest.getBytes(bytesWithDigest)); + ReferenceCounted refCnt = digestManager.computeDigestAndPackageForSending( + entryId, lac, data.readableBytes(), data, new byte[20], 0); + + byte[] bytesWithDigest = null; + if (refCnt instanceof ByteBufList) { + ByteBufList dataWithDigest = (ByteBufList) refCnt; + bytesWithDigest = new byte[dataWithDigest.readableBytes()]; + assertEquals(bytesWithDigest.length, dataWithDigest.getBytes(bytesWithDigest)); + } final Map callbacks = Collections.synchronizedMap(new HashMap<>()); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java index 82b031c5ae8..a37462dee7d 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java @@ -23,8 +23,10 @@ import static org.junit.Assert.assertEquals; import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCounted; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.proto.MockBookieClient; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.apache.bookkeeper.util.ByteBufList; import org.junit.Test; @@ -57,17 +59,20 @@ public void testPendingReadLacOpMissingExplicitLAC() throws Exception { public void initiate() { for (int i = 0; i < lh.getCurrentEnsemble().size(); i++) { final int index = i; - ByteBufList buffer = lh.getDigestManager().computeDigestAndPackageForSending( + ReferenceCounted toSend = lh.getDigestManager().computeDigestAndPackageForSending( 2, 1, data.length, - Unpooled.wrappedBuffer(data)); + Unpooled.wrappedBuffer(data), + new byte[20], + 0); + bkc.scheduler.schedule(() -> { readLacComplete( 0, lh.getId(), null, - Unpooled.copiedBuffer(buffer.toArray()), + MockBookieClient.copyData(toSend), index); }, 0, TimeUnit.SECONDS); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java index 2de9ab5e19a..98106e22fc0 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java @@ -23,7 +23,9 @@ import static org.apache.bookkeeper.proto.BookieProtocol.FLAG_RECOVERY_ADD; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCounted; import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; @@ -148,7 +150,7 @@ public void writeLac(BookieId addr, long ledgerId, byte[] masterKey, @Override public void addEntry(BookieId addr, long ledgerId, byte[] masterKey, - long entryId, ByteBufList toSend, WriteCallback cb, Object ctx, + long entryId, ReferenceCounted toSend, WriteCallback cb, Object ctx, int options, boolean allowFastFail, EnumSet writeFlags) { toSend.retain(); preWriteHook.runHook(addr, ledgerId, entryId) @@ -262,11 +264,11 @@ public boolean isClosed() { public void close() { } - private static ByteBuf copyData(ByteBufList list) { - ByteBuf buf = Unpooled.buffer(list.readableBytes()); - for (int i = 0; i < list.size(); i++) { - buf.writeBytes(list.getBuffer(i).slice()); + public static ByteBuf copyData(ReferenceCounted rc) { + if (rc instanceof ByteBuf) { + return Unpooled.copiedBuffer((ByteBuf) rc); + } else { + return ByteBufList.coalesce((ByteBufList) rc); } - return buf; } } \ No newline at end of file diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java index c670e87ff0c..cef77c3f99a 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java @@ -84,8 +84,8 @@ public ByteBuf generateEntry(long ledgerId, long entryId, long lac) throws Excep DigestManager digestManager = DigestManager.instantiate(ledgerId, new byte[0], DataFormats.LedgerMetadataFormat.DigestType.CRC32C, UnpooledByteBufAllocator.DEFAULT, false); - return ByteBufList.coalesce(digestManager.computeDigestAndPackageForSending( - entryId, lac, 0, Unpooled.buffer(10))); + return ByteBufList.coalesce((ByteBufList) digestManager.computeDigestAndPackageForSending( + entryId, lac, 0, Unpooled.buffer(10), new byte[20], 0)); } From 4fd46b0333c97d6ff5130d2085495044b77d6ffc Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Fri, 17 Feb 2023 16:12:45 -0800 Subject: [PATCH 02/13] Fixed checkstyle --- .../java/org/apache/bookkeeper/client/PendingAddOp.java | 1 - .../org/apache/bookkeeper/proto/PerChannelBookieClient.java | 2 -- .../org/apache/bookkeeper/client/LedgerHandleAdapter.java | 6 +++--- .../bookkeeper/client/ParallelLedgerRecoveryTest.java | 1 - .../java/org/apache/bookkeeper/proto/MockBookieClient.java | 1 - 5 files changed, 3 insertions(+), 8 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java index 87954179b0b..1308ce093e4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java @@ -39,7 +39,6 @@ import org.apache.bookkeeper.client.api.WriteFlag; import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; -import org.apache.bookkeeper.util.ByteBufList; import org.apache.bookkeeper.util.MathUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index a4a1e331aaf..851c01d72ab 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -21,7 +21,6 @@ import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID; import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import com.google.protobuf.ByteString; import com.google.protobuf.ExtensionRegistry; @@ -806,7 +805,6 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, Referen } ByteString body = null; - Preconditions.checkArgument(toSend instanceof ByteBufList); ByteBufList bufToSend = (ByteBufList) toSend; if (bufToSend.hasArray()) { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java index 91c7e0f0784..086e9f330c5 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java @@ -21,7 +21,6 @@ import io.netty.buffer.ByteBuf; import org.apache.bookkeeper.proto.MockBookieClient; -import org.apache.bookkeeper.util.ByteBufList; /** * Adapter for tests to get the public access from LedgerHandle for its default @@ -30,7 +29,8 @@ public class LedgerHandleAdapter { public static ByteBuf toSend(LedgerHandle lh, long entryId, ByteBuf data) { - return MockBookieClient.copyData(lh.getDigestManager().computeDigestAndPackageForSending(entryId, lh.getLastAddConfirmed(), - lh.addToLength(data.readableBytes()), data, new byte[20], 0)); + return MockBookieClient.copyData(lh.getDigestManager() + .computeDigestAndPackageForSending(entryId, lh.getLastAddConfirmed(), + lh.addToLength(data.readableBytes()), data, new byte[20], 0)); } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java index 812fae88c0b..4efc4465e38 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java @@ -62,7 +62,6 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.proto.checksum.DigestManager; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; -import org.apache.bookkeeper.util.ByteBufList; import org.apache.bookkeeper.versioning.Version; import org.apache.bookkeeper.versioning.Versioned; import org.apache.commons.lang3.mutable.MutableInt; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java index 98106e22fc0..419fa388c8a 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java @@ -23,7 +23,6 @@ import static org.apache.bookkeeper.proto.BookieProtocol.FLAG_RECOVERY_ADD; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.util.ReferenceCounted; import java.util.Arrays; From 7901773fe39379e737617b499fcc90a8b6de6df6 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Sun, 26 Feb 2023 09:46:35 -0800 Subject: [PATCH 03/13] Fixed treating of ComposityByteBuf --- .../bookkeeper/proto/checksum/DigestManager.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java index d389a9c69c9..5162468b2be 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java @@ -135,7 +135,18 @@ private ReferenceCounted computeDigestAndPackageForSendingV2(long entryId, long // Compute checksum over the headers update(buf); - update(data); + + // don't unwrap slices + final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() instanceof CompositeByteBuf + ? data.unwrap() : data; + ReferenceCountUtil.retain(unwrapped); + ReferenceCountUtil.safeRelease(data); + + if (unwrapped instanceof CompositeByteBuf) { + ((CompositeByteBuf) unwrapped).forEach(this::update); + } else { + update(unwrapped); + } populateValueAndReset(buf); From 5b50e062867a942360b117ecce0bb178f9753ccc Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Sun, 26 Feb 2023 21:59:38 -0800 Subject: [PATCH 04/13] Fixed merge issues --- .../bookkeeper/proto/checksum/DigestManagerBenchmark.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManagerBenchmark.java b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManagerBenchmark.java index d9545620163..04fa500d476 100644 --- a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManagerBenchmark.java +++ b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManagerBenchmark.java @@ -83,8 +83,9 @@ public void doSetup() throws Exception { data.writeBytes(randomBytes(entrySize)); digestBuf = ByteBufAllocator.DEFAULT.directBuffer(); - digestBuf.writeBytes(ByteBufList.coalesce( - dm.computeDigestAndPackageForSending(1234, 1234, entrySize, data))); + digestBuf.writeBytes((ByteBuf) + dm.computeDigestAndPackageForSending(1234, 1234, entrySize, data, + new byte[0], 0)); } } From cc3c34350603056a8cf53ca3065d20e23df92c64 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Mon, 27 Feb 2023 14:54:49 -0800 Subject: [PATCH 05/13] Fixed merge issues --- .../apache/bookkeeper/proto/checksum/DigestManager.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java index f3e890bac2f..5731b850229 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java @@ -135,7 +135,7 @@ private ReferenceCounted computeDigestAndPackageForSendingV2(long entryId, long buf.writeLong(length); // Compute checksum over the headers - update(buf, buf.readerIndex(), buf.readableBytes()); + int digest = update(0, buf, buf.readerIndex(), buf.readableBytes()); // don't unwrap slices final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() instanceof CompositeByteBuf @@ -147,13 +147,13 @@ private ReferenceCounted computeDigestAndPackageForSendingV2(long entryId, long CompositeByteBuf cbb = (CompositeByteBuf) unwrapped; for (int i = 0; i < cbb.numComponents(); i++) { ByteBuf b = cbb.component(i); - update(b, b.readerIndex(), b.readableBytes()); + digest = update(digest, b, b.readerIndex(), b.readableBytes()); } } else { - update(unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes()); + digest = update(digest, unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes()); } - populateValueAndReset(buf); + populateValueAndReset(digest, buf); // Reset the reader index to the beginning buf.readerIndex(0); From 8791faf184c5ae37bb4fe9570ed998de76dbf431 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Mon, 27 Feb 2023 21:20:33 -0800 Subject: [PATCH 06/13] WIP --- .../org/apache/bookkeeper/proto/MockBookieClient.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java index 419fa388c8a..5849d497298 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java @@ -264,10 +264,14 @@ public void close() { } public static ByteBuf copyData(ReferenceCounted rc) { + ByteBuf res; if (rc instanceof ByteBuf) { - return Unpooled.copiedBuffer((ByteBuf) rc); + res = Unpooled.copiedBuffer((ByteBuf) rc); } else { - return ByteBufList.coalesce((ByteBufList) rc); + res = ByteBufList.coalesce((ByteBufList) rc); } + + rc.release(); + return res; } } \ No newline at end of file From ca4a555b9f173f3f3e7e39fc18208e40d24b3e9d Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 28 Feb 2023 11:16:18 -0800 Subject: [PATCH 07/13] Fixed test and removed dead code --- .../bookkeeper/proto/BookieProtoEncoding.java | 18 +------ .../bookkeeper/proto/BookieProtocol.java | 52 ------------------- .../proto/PerChannelBookieClient.java | 8 --- .../proto/BookieBackpressureForV2Test.java | 4 ++ .../bookkeeper/proto/ProtocolBenchmark.java | 16 ------ 5 files changed, 5 insertions(+), 93 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java index 6d4632471d1..957251acfb5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java @@ -110,23 +110,7 @@ public Object encode(Object msg, ByteBufAllocator allocator) return msg; } BookieProtocol.Request r = (BookieProtocol.Request) msg; - if (r instanceof BookieProtocol.AddRequest) { - BookieProtocol.AddRequest ar = (BookieProtocol.AddRequest) r; - ByteBufList data = ar.getData(); - - int totalHeaderSize = 4 // for the request header - + BookieProtocol.MASTER_KEY_LENGTH; // for the master key - - int totalPayloadSize = totalHeaderSize + data.readableBytes(); - ByteBuf buf = allocator.buffer(totalHeaderSize + 4 /* frame size */); - buf.writeInt(totalPayloadSize); // Frame header - buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), r.getOpCode(), r.getFlags())); - buf.writeBytes(r.getMasterKey(), 0, BookieProtocol.MASTER_KEY_LENGTH); - - ar.recycle(); - data.prepend(buf); - return data; - } else if (r instanceof BookieProtocol.ReadRequest) { + if (r instanceof BookieProtocol.ReadRequest) { int totalHeaderSize = 4 // for request type + 8 // for ledgerId + 8; // for entryId diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java index feeb2499c82..367840cde12 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java @@ -252,58 +252,6 @@ public String toString() { public void recycle() {} } - /** - * A Request that adds data. - */ - class AddRequest extends Request { - ByteBufList data; - - static AddRequest create(byte protocolVersion, long ledgerId, - long entryId, short flags, byte[] masterKey, - ByteBufList data) { - AddRequest add = RECYCLER.get(); - add.protocolVersion = protocolVersion; - add.opCode = ADDENTRY; - add.ledgerId = ledgerId; - add.entryId = entryId; - add.flags = flags; - add.masterKey = masterKey; - add.data = data.retain(); - return add; - } - - ByteBufList getData() { - // We need to have different ByteBufList instances for each bookie write - return ByteBufList.clone(data); - } - - boolean isRecoveryAdd() { - return (flags & FLAG_RECOVERY_ADD) == FLAG_RECOVERY_ADD; - } - - private final Handle recyclerHandle; - private AddRequest(Handle recyclerHandle) { - this.recyclerHandle = recyclerHandle; - } - - private static final Recycler RECYCLER = new Recycler() { - @Override - protected AddRequest newObject(Handle handle) { - return new AddRequest(handle); - } - }; - - @Override - public void recycle() { - ledgerId = -1; - entryId = -1; - masterKey = null; - ReferenceCountUtil.safeRelease(data); - data = null; - recyclerHandle.recycle(this); - } - } - /** * This is similar to add request, but it used when processing the request on the bookie side. */ diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index f0ac75e555a..fa26884ab48 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -1150,14 +1150,6 @@ private void writeAndFlush(final Channel channel, StringUtils.requestToString(request)); errorOut(key, BKException.Code.TooManyRequestsException); - - // If the request is a V2 add request, we retained the data's reference when creating the AddRequest - // object. To avoid the object leak, we need to release the reference if we met any errors - // before sending it. - if (request instanceof BookieProtocol.AddRequest) { - BookieProtocol.AddRequest ar = (BookieProtocol.AddRequest) request; - ar.recycle(); - } return; } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java index ce8d65fb76b..8721a2c7819 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java @@ -18,6 +18,8 @@ */ package org.apache.bookkeeper.proto; +import org.apache.bookkeeper.client.BookKeeperTestClient; +import org.apache.bookkeeper.test.TestStatsProvider; import org.junit.Before; /** @@ -30,6 +32,8 @@ public class BookieBackpressureForV2Test extends BookieBackpressureTest { public void setUp() throws Exception { super.setUp(); baseClientConf.setUseV2WireProtocol(true); + bkc = new BookKeeperTestClient(baseClientConf, new TestStatsProvider()); + // the backpressure will bloc the read response, disable it to let it use backpressure mechanism confByIndex(0).setReadWorkerThreadsThrottlingEnabled(false); } diff --git a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/ProtocolBenchmark.java b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/ProtocolBenchmark.java index 7eb8fa97746..308463608b5 100644 --- a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/ProtocolBenchmark.java +++ b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/ProtocolBenchmark.java @@ -79,22 +79,6 @@ public void prepare() { this.reqEnDeV3 = new RequestEnDecoderV3(null); } - - @Benchmark - public void testAddEntryV2() throws Exception { - ByteBufList list = ByteBufList.get(entry.retainedSlice()); - BookieProtocol.AddRequest req = BookieProtocol.AddRequest.create( - BookieProtocol.CURRENT_PROTOCOL_VERSION, - ledgerId, - entryId, - flags, - masterKey, - list); - Object res = this.reqEnDeV2.encode(req, ByteBufAllocator.DEFAULT); - ReferenceCountUtil.release(res); - ReferenceCountUtil.release(list); - } - @Benchmark public void testAddEntryV3() throws Exception { // Build the request and calculate the total size to be included in the packet. From e58432317d5799557bc9c2fd2338b169210660f0 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 28 Feb 2023 11:38:08 -0800 Subject: [PATCH 08/13] Removed unused import --- .../main/java/org/apache/bookkeeper/proto/BookieProtocol.java | 1 - 1 file changed, 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java index 367840cde12..d04ab187d4d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java @@ -27,7 +27,6 @@ import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCounted; import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage; -import org.apache.bookkeeper.util.ByteBufList; /** * The packets of the Bookie protocol all have a 4-byte integer indicating the From 28d35813fdbc10cf03de2ec60175a63b47b8946e Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 28 Feb 2023 16:25:20 -0800 Subject: [PATCH 09/13] Fixed BookieJournalTest --- .../java/org/apache/bookkeeper/proto/MockBookieClient.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java index 5849d497298..9d5d8ffc02f 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java @@ -23,6 +23,7 @@ import static org.apache.bookkeeper.proto.BookieProtocol.FLAG_RECOVERY_ADD; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.util.ReferenceCounted; import java.util.Arrays; @@ -271,6 +272,9 @@ public static ByteBuf copyData(ReferenceCounted rc) { res = ByteBufList.coalesce((ByteBufList) rc); } + // Skip headers + res.skipBytes(28); + rc.release(); return res; } From 3390bc5212afcda6d91ccf16e364809871a3da3a Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 28 Feb 2023 16:39:42 -0800 Subject: [PATCH 10/13] removed unused import --- .../java/org/apache/bookkeeper/proto/MockBookieClient.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java index 9d5d8ffc02f..7d165b089b8 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java @@ -23,7 +23,6 @@ import static org.apache.bookkeeper.proto.BookieProtocol.FLAG_RECOVERY_ADD; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.util.ReferenceCounted; import java.util.Arrays; @@ -278,4 +277,4 @@ public static ByteBuf copyData(ReferenceCounted rc) { rc.release(); return res; } -} \ No newline at end of file +} From 822eb3e3326b322000489b61a50ebf01d302c9c4 Mon Sep 17 00:00:00 2001 From: chenhang Date: Mon, 20 Mar 2023 17:17:53 +0800 Subject: [PATCH 11/13] fix the checkstyle --- .../org/apache/bookkeeper/proto/checksum/DigestManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java index 5731b850229..375c2bd7e1e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java @@ -110,7 +110,7 @@ public ReferenceCounted computeDigestAndPackageForSending(long entryId, long las } private ReferenceCounted computeDigestAndPackageForSendingV2(long entryId, long lastAddConfirmed, long length, - ByteBuf data, byte[] masterKey, int flags) { + ByteBuf data, byte[] masterKey, int flags) { boolean isSmallEntry = data.readableBytes() < BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD; int headersSize = 4 // Request header From eeedef6357f08db627d5320d49a4b7aa2a1e8f30 Mon Sep 17 00:00:00 2001 From: chenhang Date: Mon, 20 Mar 2023 19:54:58 +0800 Subject: [PATCH 12/13] fix failed test --- .../test/java/org/apache/bookkeeper/proto/MockBookieClient.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java index 7d165b089b8..ff5e86d22d4 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java @@ -273,8 +273,6 @@ public static ByteBuf copyData(ReferenceCounted rc) { // Skip headers res.skipBytes(28); - - rc.release(); return res; } } From 76c4768ee6ce725d6a803e34d03814a9c014c408 Mon Sep 17 00:00:00 2001 From: chenhang Date: Mon, 20 Mar 2023 20:54:23 +0800 Subject: [PATCH 13/13] fix failed test --- .../org/apache/bookkeeper/client/ClientUtil.java | 4 ++-- .../apache/bookkeeper/proto/MockBookieClient.java | 13 +++++++++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java index 8c1480bb27f..3f8af53c133 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java @@ -48,8 +48,8 @@ public static ByteBuf generatePacket(long ledgerId, long entryId, long lastAddCo int offset, int len) throws GeneralSecurityException { DigestManager dm = DigestManager.instantiate(ledgerId, new byte[2], DigestType.CRC32, UnpooledByteBufAllocator.DEFAULT, true); - return MockBookieClient.copyData(dm.computeDigestAndPackageForSending(entryId, lastAddConfirmed, length, - Unpooled.wrappedBuffer(data, offset, len), new byte[20], 0)); + return MockBookieClient.copyDataWithSkipHeader(dm.computeDigestAndPackageForSending(entryId, lastAddConfirmed, + length, Unpooled.wrappedBuffer(data, offset, len), new byte[20], 0)); } /** diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java index ff5e86d22d4..c4344c74d00 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java @@ -271,8 +271,21 @@ public static ByteBuf copyData(ReferenceCounted rc) { res = ByteBufList.coalesce((ByteBufList) rc); } + return res; + } + + public static ByteBuf copyDataWithSkipHeader(ReferenceCounted rc) { + ByteBuf res; + if (rc instanceof ByteBuf) { + res = Unpooled.copiedBuffer((ByteBuf) rc); + } else { + res = ByteBufList.coalesce((ByteBufList) rc); + } + // Skip headers res.skipBytes(28); + rc.release(); + return res; } }