Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Single buffer add tests #32

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import static org.apache.bookkeeper.replication.ReplicationStats.WRITE_DATA_LATENCY;

import com.google.common.util.concurrent.RateLimiter;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCounted;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -403,17 +405,24 @@ public void readComplete(int rc, LedgerHandle lh,
numEntriesRead.inc();
numBytesRead.registerSuccessfulValue(dataLength);

ByteBufList toSend = lh.getDigestManager()
ReferenceCounted toSend = lh.getDigestManager()
.computeDigestAndPackageForSending(entryId,
lh.getLastAddConfirmed(), entry.getLength(),
Unpooled.wrappedBuffer(data, 0, data.length));
Unpooled.wrappedBuffer(data, 0, data.length),
lh.getLedgerKey(),
0
);
if (replicationThrottle != null) {
updateAverageEntrySize(toSend.readableBytes());
if (toSend instanceof ByteBuf) {
updateAverageEntrySize(((ByteBuf) toSend).readableBytes());
} else if (toSend instanceof ByteBufList) {
updateAverageEntrySize(((ByteBufList) toSend).readableBytes());
}
}
for (BookieId newBookie : newBookies) {
long startWriteEntryTime = MathUtils.nowInNano();
bkc.getBookieClient().addEntry(newBookie, lh.getId(),
lh.getLedgerKey(), entryId, ByteBufList.clone(toSend),
lh.getLedgerKey(), entryId, toSend,
multiWriteCallback, dataLength, BookieProtocol.FLAG_RECOVERY_ADD,
false, WriteFlag.NONE);
writeDataLatency.registerSuccessfulEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
Expand All @@ -38,7 +39,6 @@
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -56,7 +56,7 @@ class PendingAddOp implements WriteCallback {
private static final Logger LOG = LoggerFactory.getLogger(PendingAddOp.class);

ByteBuf payload;
ByteBufList toSend;
ReferenceCounted toSend;
AddCallbackWithLatency cb;
Object ctx;
long entryId;
Expand Down Expand Up @@ -242,9 +242,10 @@ public synchronized void initiate() {
checkNotNull(lh);
checkNotNull(lh.macManager);

int flags = isRecoveryAdd ? FLAG_RECOVERY_ADD | FLAG_HIGH_PRIORITY : FLAG_NONE;
this.toSend = lh.macManager.computeDigestAndPackageForSending(
entryId, lh.lastAddConfirmed, currentLedgerLength,
payload);
payload, lh.ledgerKey, flags);
// ownership of RefCounted ByteBuf was passed to computeDigestAndPackageForSending
payload = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.bookkeeper.auth.AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME;

import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -39,6 +40,7 @@
import org.apache.bookkeeper.auth.ClientAuthProvider;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
import org.apache.bookkeeper.util.ByteBufList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -358,8 +360,10 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
} else {
waitingForAuth.add(msg);
}
} else if (msg instanceof ByteBuf || msg instanceof ByteBufList) {
waitingForAuth.add(msg);
} else {
LOG.info("dropping write of message {}", msg);
LOG.info("[{}] dropping write of message {}", ctx.channel(), msg);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/
package org.apache.bookkeeper.proto;

import io.netty.util.ReferenceCounted;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -139,7 +140,7 @@ void writeLac(BookieId address, long ledgerId, byte[] masterKey,
* {@link org.apache.bookkeeper.client.api.WriteFlag}
*/
void addEntry(BookieId address, long ledgerId, byte[] masterKey,
long entryId, ByteBufList toSend, WriteCallback cb, Object ctx,
long entryId, ReferenceCounted toSend, WriteCallback cb, Object ctx,
int options, boolean allowFastFail, EnumSet<WriteFlag> writeFlags);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.util.EnumSet;
Expand Down Expand Up @@ -288,7 +289,7 @@ public void addEntry(final BookieId addr,
final long ledgerId,
final byte[] masterKey,
final long entryId,
final ByteBufList toSend,
final ReferenceCounted toSend,
final WriteCallback cb,
final Object ctx,
final int options,
Expand Down Expand Up @@ -357,7 +358,7 @@ private static class ChannelReadyForAddEntryCallback
private final Handle<ChannelReadyForAddEntryCallback> recyclerHandle;

private BookieClientImpl bookieClient;
private ByteBufList toSend;
private ReferenceCounted toSend;
private long ledgerId;
private long entryId;
private BookieId addr;
Expand All @@ -369,7 +370,7 @@ private static class ChannelReadyForAddEntryCallback
private EnumSet<WriteFlag> writeFlags;

static ChannelReadyForAddEntryCallback create(
BookieClientImpl bookieClient, ByteBufList toSend, long ledgerId,
BookieClientImpl bookieClient, ReferenceCounted toSend, long ledgerId,
long entryId, BookieId addr, Object ctx,
WriteCallback cb, int options, byte[] masterKey, boolean allowFastFail,
EnumSet<WriteFlag> writeFlags) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,23 +110,7 @@ public Object encode(Object msg, ByteBufAllocator allocator)
return msg;
}
BookieProtocol.Request r = (BookieProtocol.Request) msg;
if (r instanceof BookieProtocol.AddRequest) {
BookieProtocol.AddRequest ar = (BookieProtocol.AddRequest) r;
ByteBufList data = ar.getData();

int totalHeaderSize = 4 // for the request header
+ BookieProtocol.MASTER_KEY_LENGTH; // for the master key

int totalPayloadSize = totalHeaderSize + data.readableBytes();
ByteBuf buf = allocator.buffer(totalHeaderSize + 4 /* frame size */);
buf.writeInt(totalPayloadSize); // Frame header
buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), r.getOpCode(), r.getFlags()));
buf.writeBytes(r.getMasterKey(), 0, BookieProtocol.MASTER_KEY_LENGTH);

ar.recycle();
data.prepend(buf);
return data;
} else if (r instanceof BookieProtocol.ReadRequest) {
if (r instanceof BookieProtocol.ReadRequest) {
int totalHeaderSize = 4 // for request type
+ 8 // for ledgerId
+ 8; // for entryId
Expand Down Expand Up @@ -437,7 +421,9 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
if (LOG.isTraceEnabled()) {
LOG.trace("Encode request {} to channel {}.", msg, ctx.channel());
}
if (msg instanceof BookkeeperProtocol.Request) {
if (msg instanceof ByteBuf || msg instanceof ByteBufList) {
ctx.write(msg, promise);
} else if (msg instanceof BookkeeperProtocol.Request) {
ctx.write(reqV3.encode(msg, ctx.alloc()), promise);
} else if (msg instanceof BookieProtocol.Request) {
ctx.write(reqPreV3.encode(msg, ctx.alloc()), promise);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
import org.apache.bookkeeper.util.ByteBufList;

/**
* The packets of the Bookie protocol all have a 4-byte integer indicating the
Expand Down Expand Up @@ -252,58 +251,6 @@ public String toString() {
public void recycle() {}
}

/**
* A Request that adds data.
*/
class AddRequest extends Request {
ByteBufList data;

static AddRequest create(byte protocolVersion, long ledgerId,
long entryId, short flags, byte[] masterKey,
ByteBufList data) {
AddRequest add = RECYCLER.get();
add.protocolVersion = protocolVersion;
add.opCode = ADDENTRY;
add.ledgerId = ledgerId;
add.entryId = entryId;
add.flags = flags;
add.masterKey = masterKey;
add.data = data.retain();
return add;
}

ByteBufList getData() {
// We need to have different ByteBufList instances for each bookie write
return ByteBufList.clone(data);
}

boolean isRecoveryAdd() {
return (flags & FLAG_RECOVERY_ADD) == FLAG_RECOVERY_ADD;
}

private final Handle<AddRequest> recyclerHandle;
private AddRequest(Handle<AddRequest> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}

private static final Recycler<AddRequest> RECYCLER = new Recycler<AddRequest>() {
@Override
protected AddRequest newObject(Handle<AddRequest> handle) {
return new AddRequest(handle);
}
};

@Override
public void recycle() {
ledgerId = -1;
entryId = -1;
masterKey = null;
ReferenceCountUtil.release(data);
data = null;
recyclerHandle.recycle(this);
}
}

/**
* This is similar to add request, but it used when processing the request on the bookie side.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
Expand Down Expand Up @@ -771,7 +772,7 @@ void forceLedger(final long ledgerId, ForceLedgerCallback cb, Object ctx) {
* @param writeFlags
* WriteFlags
*/
void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBufList toSend, WriteCallback cb,
void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ReferenceCounted toSend, WriteCallback cb,
Object ctx, final int options, boolean allowFastFail, final EnumSet<WriteFlag> writeFlags) {
Object request = null;
CompletionKey completionKey = null;
Expand All @@ -782,9 +783,12 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBuf
return;
}
completionKey = acquireV2Key(ledgerId, entryId, OperationType.ADD_ENTRY);
request = BookieProtocol.AddRequest.create(
BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
(short) options, masterKey, toSend);

if (toSend instanceof ByteBuf) {
request = ((ByteBuf) toSend).retainedDuplicate();
} else {
request = ByteBufList.clone((ByteBufList) toSend);
}
} else {
final long txnId = getTxnId();
completionKey = new V3CompletionKey(txnId, OperationType.ADD_ENTRY);
Expand All @@ -799,11 +803,14 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBuf
}

ByteString body = null;
if (toSend.hasArray()) {
body = UnsafeByteOperations.unsafeWrap(toSend.array(), toSend.arrayOffset(), toSend.readableBytes());
ByteBufList bufToSend = (ByteBufList) toSend;

if (bufToSend.hasArray()) {
body = UnsafeByteOperations.unsafeWrap(bufToSend.array(), bufToSend.arrayOffset(),
bufToSend.readableBytes());
} else {
for (int i = 0; i < toSend.size(); i++) {
ByteString piece = UnsafeByteOperations.unsafeWrap(toSend.getBuffer(i).nioBuffer());
for (int i = 0; i < bufToSend.size(); i++) {
ByteString piece = UnsafeByteOperations.unsafeWrap(bufToSend.getBuffer(i).nioBuffer());
// use ByteString.concat to avoid byte[] allocation when toSend has multiple ByteBufs
body = (body == null) ? piece : body.concat(piece);
}
Expand Down Expand Up @@ -1143,14 +1150,6 @@ private void writeAndFlush(final Channel channel,
StringUtils.requestToString(request));

errorOut(key, BKException.Code.TooManyRequestsException);

// If the request is a V2 add request, we retained the data's reference when creating the AddRequest
// object. To avoid the object leak, we need to release the reference if we met any errors
// before sending it.
if (request instanceof BookieProtocol.AddRequest) {
BookieProtocol.AddRequest ar = (BookieProtocol.AddRequest) request;
ar.recycle();
}
return;
}

Expand Down
Loading