Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix checksum calculation bug when the payload is a CompositeByteBuf with readerIndex > 0 #4196

Merged
merged 39 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
9fb47f5
Add a test that reproduces a bug in checksum calculation
lhotari Jan 31, 2024
0707710
Revert "Fixed unnecessary copy to heap (#2701)" changes to ByteBufList
lhotari Jan 30, 2024
fe9301d
Remove CompositeBuffer unwrapping in DigestManager
lhotari Jan 30, 2024
532de4f
Rename update -> internalUpdate so that unwrapping logic could be add…
lhotari Jan 30, 2024
9a39d6b
Remove unnecessary unwrapping logic in Java9IntHash
lhotari Jan 30, 2024
986942d
Add safe way to handle CompositeByteBuf
lhotari Jan 31, 2024
0669a66
Add license header
lhotari Jan 31, 2024
4036a3a
Fix checkstyle
lhotari Jan 31, 2024
7a8c200
Refactor ByteBuf visitor solution
lhotari Jan 31, 2024
b5b42ee
Fix checkstyle
lhotari Jan 31, 2024
81e9d2d
Reformat
lhotari Jan 31, 2024
6c7e6e0
Refactor recursive visiting
lhotari Jan 31, 2024
c19ae5d
Revisit equals, hashCode and toString
lhotari Jan 31, 2024
8928f98
Refactor test case
lhotari Feb 1, 2024
19e33ca
Add support for UnpooledHeapByteBuf.getBytes which passes an array
lhotari Feb 1, 2024
86d002d
Add support for visiting buffers backed by byte[] arrays
lhotari Feb 1, 2024
e793872
Move ByteBufVisitor to org.apache.bookkeeper.util package
lhotari Feb 1, 2024
6508bf5
Update javadoc
lhotari Feb 1, 2024
5cd2dd8
Refactor to use stateless visitor so that instance can be shared
lhotari Feb 1, 2024
fc93b5e
Improve test so that a single scenario can be used for debugging
lhotari Feb 2, 2024
272e962
Fix bug in Java9IntHash calculation that assumed crc32c_update(x) == …
lhotari Feb 2, 2024
b4d0b18
Fix checkstyle
lhotari Feb 2, 2024
9c1b68f
Fix checkstyle
lhotari Feb 2, 2024
a751294
Fix missing depth increment that prevents StackOverflowException
lhotari Feb 3, 2024
5f8fef5
Properly handle the depth increase and decrease
lhotari Feb 3, 2024
bb703ab
Remove unnecessary condition
lhotari Feb 3, 2024
2b79716
Use more efficient way to read bytes to the target array
lhotari Feb 3, 2024
f650b9b
Don't use ByteBufVisitor if it's not necessary
lhotari Feb 3, 2024
d21a5cd
Revert "Fix bug in Java9IntHash calculation that assumed crc32c_updat…
lhotari Feb 3, 2024
a9ed5e4
Fix issue in resume byte[] version that was added
lhotari Feb 3, 2024
b7d5795
Polish ByteBufVisitor
lhotari Feb 4, 2024
34c1c8a
Use extracted method
lhotari Feb 4, 2024
be20948
Fix bug with array handling
lhotari Feb 4, 2024
7998659
Polish ByteBufVisitor
lhotari Feb 4, 2024
b691316
Optimize the buffer copying in the case where array or memory address…
lhotari Feb 4, 2024
0af1556
Check if memory address is accepted
lhotari Feb 4, 2024
317aab7
Improve comments about complement (current = ~current) in resume
lhotari Feb 4, 2024
2c73dca
Print thread dump when build is cancelled
lhotari Feb 1, 2024
c215db8
Filter empty buffers and arrays in ByteBufVisitor
lhotari Feb 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/bk-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ jobs:
path: surefire-reports
retention-days: 7

- name: print JVM thread dumps when cancelled
if: cancelled()
run: ./dev/ci-tool print_thread_dumps

lhotari marked this conversation as resolved.
Show resolved Hide resolved
integration-tests:
name: Integration Tests
runs-on: ubuntu-latest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,17 @@ void populateValueAndReset(int digest, ByteBuf buf) {
}

@Override
int update(int digest, ByteBuf data, int offset, int len) {
int internalUpdate(int digest, ByteBuf data, int offset, int len) {
return Crc32cIntChecksum.resumeChecksum(digest, data, offset, len);
}

@Override
int internalUpdate(int digest, byte[] buffer, int offset, int len) {
return Crc32cIntChecksum.resumeChecksum(digest, buffer, offset, len);
}

@Override
boolean acceptsMemoryAddressBuffer() {
return Crc32cIntChecksum.acceptsMemoryAddressBuffer();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ interface CRC32Digest {
long getValueAndReset();

void update(ByteBuf buf, int offset, int len);
void update(byte[] buffer, int offset, int len);
}

private static final FastThreadLocal<CRC32Digest> crc = new FastThreadLocal<CRC32Digest>() {
Expand Down Expand Up @@ -62,14 +63,25 @@ void populateValueAndReset(int digest, ByteBuf buf) {
}

@Override
int update(int digest, ByteBuf data, int offset, int len) {
int internalUpdate(int digest, ByteBuf data, int offset, int len) {
crc.get().update(data, offset, len);
return 0;
}

@Override
int internalUpdate(int digest, byte[] buffer, int offset, int len) {
crc.get().update(buffer, offset, len);
return 0;
}

@Override
boolean isInt32Digest() {
// This is stored as 8 bytes
return false;
}

@Override
boolean acceptsMemoryAddressBuffer() {
return DirectMemoryCRC32Digest.isSupported();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.FastThreadLocal;
import java.security.GeneralSecurityException;
Expand All @@ -34,6 +32,7 @@
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.ByteBufVisitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -53,10 +52,25 @@ public abstract class DigestManager {
final long ledgerId;
final boolean useV2Protocol;
private final ByteBufAllocator allocator;
private final DigestUpdaterByteBufVisitorCallback byteBufVisitorCallback;

abstract int getMacCodeLength();

abstract int update(int digest, ByteBuf buffer, int offset, int len);
abstract int internalUpdate(int digest, ByteBuf buffer, int offset, int len);

abstract int internalUpdate(int digest, byte[] buffer, int offset, int len);

final int update(int digest, ByteBuf buffer, int offset, int len) {
if (buffer.hasMemoryAddress() && acceptsMemoryAddressBuffer()) {
return internalUpdate(digest, buffer, offset, len);
} else if (buffer.hasArray()) {
return internalUpdate(digest, buffer.array(), buffer.arrayOffset() + offset, len);
} else {
UpdateContext updateContext = new UpdateContext(digest);
ByteBufVisitor.visitBuffers(buffer, offset, len, byteBufVisitorCallback, updateContext);
return updateContext.digest;
}
}

abstract void populateValueAndReset(int digest, ByteBuf buffer);

Expand All @@ -69,6 +83,7 @@ public DigestManager(long ledgerId, boolean useV2Protocol, ByteBufAllocator allo
this.useV2Protocol = useV2Protocol;
this.macCodeLength = getMacCodeLength();
this.allocator = allocator;
this.byteBufVisitorCallback = new DigestUpdaterByteBufVisitorCallback();
}

public static DigestManager instantiate(long ledgerId, byte[] passwd, DigestType digestType,
Expand Down Expand Up @@ -136,34 +151,19 @@ private ReferenceCounted computeDigestAndPackageForSendingV2(long entryId, long

// Compute checksum over the headers
int digest = update(0, buf, buf.readerIndex(), buf.readableBytes());

// don't unwrap slices
final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() instanceof CompositeByteBuf
? data.unwrap() : data;
ReferenceCountUtil.retain(unwrapped);
ReferenceCountUtil.safeRelease(data);

if (unwrapped instanceof CompositeByteBuf) {
CompositeByteBuf cbb = (CompositeByteBuf) unwrapped;
for (int i = 0; i < cbb.numComponents(); i++) {
ByteBuf b = cbb.component(i);
digest = update(digest, b, b.readerIndex(), b.readableBytes());
}
} else {
digest = update(digest, unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
}
digest = update(digest, data, data.readerIndex(), data.readableBytes());

populateValueAndReset(digest, buf);

// Reset the reader index to the beginning
buf.readerIndex(0);

if (isSmallEntry) {
buf.writeBytes(unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
unwrapped.release();
buf.writeBytes(data, data.readerIndex(), data.readableBytes());
data.release();
return buf;
} else {
return ByteBufList.get(buf, unwrapped);
return ByteBufList.get(buf, data);
}
}

Expand All @@ -176,25 +176,9 @@ private ByteBufList computeDigestAndPackageForSendingV3(long entryId, long lastA
headersBuffer.writeLong(length);

int digest = update(0, headersBuffer, 0, METADATA_LENGTH);

// don't unwrap slices
final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() instanceof CompositeByteBuf
? data.unwrap() : data;
ReferenceCountUtil.retain(unwrapped);
ReferenceCountUtil.release(data);

if (unwrapped instanceof CompositeByteBuf) {
CompositeByteBuf cbb = ((CompositeByteBuf) unwrapped);
for (int i = 0; i < cbb.numComponents(); i++) {
ByteBuf b = cbb.component(i);
digest = update(digest, b, b.readerIndex(), b.readableBytes());
}
} else {
digest = update(digest, unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
}
digest = update(digest, data, data.readerIndex(), data.readableBytes());
populateValueAndReset(digest, headersBuffer);

return ByteBufList.get(headersBuffer, unwrapped);
return ByteBufList.get(headersBuffer, data);
}

/**
Expand Down Expand Up @@ -373,4 +357,38 @@ public RecoveryData verifyDigestAndReturnLastConfirmed(ByteBuf dataReceived) thr
long length = dataReceived.readLong();
return new RecoveryData(lastAddConfirmed, length);
}

private static class UpdateContext {
int digest;

UpdateContext(int digest) {
this.digest = digest;
}
}

private class DigestUpdaterByteBufVisitorCallback implements ByteBufVisitor.ByteBufVisitorCallback<UpdateContext> {

@Override
public void visitBuffer(UpdateContext context, ByteBuf visitBuffer, int visitIndex, int visitLength) {
if (visitLength > 0) {
// recursively visit the sub buffer and update the digest
context.digest = internalUpdate(context.digest, visitBuffer, visitIndex, visitLength);
}
}

@Override
public void visitArray(UpdateContext context, byte[] visitArray, int visitIndex, int visitLength) {
if (visitLength > 0) {
// update the digest with the array
context.digest = internalUpdate(context.digest, visitArray, visitIndex, visitLength);
}
}

@Override
public boolean acceptsMemoryAddress(UpdateContext context) {
return DigestManager.this.acceptsMemoryAddressBuffer();
}
}

abstract boolean acceptsMemoryAddressBuffer();
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,32 @@ public void update(ByteBuf buf, int index, int length) {
crcValue = (int) updateByteBuffer.invoke(null, crcValue, buf.memoryAddress(), index, length);
} else if (buf.hasArray()) {
// Use the internal method to update from array based
crcValue = (int) updateBytes.invoke(null, crcValue, buf.array(), buf.arrayOffset() + index, length);
crcValue = updateArray(crcValue, buf.array(), buf.arrayOffset() + index, length);
} else {
// Fallback to data copy if buffer is not contiguous
byte[] b = new byte[length];
buf.getBytes(index, b, 0, length);
crcValue = (int) updateBytes.invoke(null, crcValue, b, 0, b.length);
crcValue = updateArray(crcValue, b, 0, length);
}
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}

private static int updateArray(int crcValue, byte[] buf, int offset, int length)
throws IllegalAccessException, InvocationTargetException {
return (int) updateBytes.invoke(null, crcValue, buf, offset, length);
}

@Override
public void update(byte[] buffer, int offset, int len) {
try {
crcValue = updateArray(crcValue, buffer, offset, len);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}

private static final Method updateByteBuffer;
private static final Method updateBytes;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@ int getMacCodeLength() {
}

@Override
int update(int digest, ByteBuf buffer, int offset, int len) {
int internalUpdate(int digest, ByteBuf buffer, int offset, int len) {
return 0;
}

@Override
int internalUpdate(int digest, byte[] buffer, int offset, int len) {
return 0;
}

Expand All @@ -49,4 +54,9 @@ void populateValueAndReset(int digest, ByteBuf buffer) {}
boolean isInt32Digest() {
return false;
}

@Override
boolean acceptsMemoryAddressBuffer() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,24 @@ void populateValueAndReset(int digest, ByteBuf buffer) {
}

@Override
int update(int digest, ByteBuf data, int offset, int len) {
int internalUpdate(int digest, ByteBuf data, int offset, int len) {
mac.get().update(data.slice(offset, len).nioBuffer());
return 0;
}

@Override
int internalUpdate(int digest, byte[] buffer, int offset, int len) {
mac.get().update(buffer, offset, len);
return 0;
}

@Override
boolean isInt32Digest() {
return false;
}

@Override
boolean acceptsMemoryAddressBuffer() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,9 @@ public long getValueAndReset() {
public void update(ByteBuf buf, int offset, int len) {
crc.update(buf.slice(offset, len).nioBuffer());
}

@Override
public void update(byte[] buffer, int offset, int len) {
crc.update(buffer, offset, len);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -133,43 +132,14 @@ public static ByteBufList get() {
* Append a {@link ByteBuf} at the end of this {@link ByteBufList}.
*/
public void add(ByteBuf buf) {
final ByteBuf unwrapped = buf.unwrap() != null && buf.unwrap() instanceof CompositeByteBuf
? buf.unwrap() : buf;
ReferenceCountUtil.retain(unwrapped);
ReferenceCountUtil.release(buf);

if (unwrapped instanceof CompositeByteBuf) {
((CompositeByteBuf) unwrapped).forEach(b -> {
ReferenceCountUtil.retain(b);
buffers.add(b);
});
ReferenceCountUtil.release(unwrapped);
} else {
buffers.add(unwrapped);
}
buffers.add(buf);
}

/**
* Prepend a {@link ByteBuf} at the beginning of this {@link ByteBufList}.
*/
public void prepend(ByteBuf buf) {
// don't unwrap slices
final ByteBuf unwrapped = buf.unwrap() != null && buf.unwrap() instanceof CompositeByteBuf
? buf.unwrap() : buf;
ReferenceCountUtil.retain(unwrapped);
ReferenceCountUtil.release(buf);

if (unwrapped instanceof CompositeByteBuf) {
CompositeByteBuf composite = (CompositeByteBuf) unwrapped;
for (int i = composite.numComponents() - 1; i >= 0; i--) {
ByteBuf b = composite.component(i);
ReferenceCountUtil.retain(b);
buffers.add(0, b);
}
ReferenceCountUtil.release(unwrapped);
} else {
buffers.add(0, unwrapped);
}
buffers.add(0, buf);
}

/**
Expand Down Expand Up @@ -285,7 +255,7 @@ public ByteBufList retain() {
@Override
protected void deallocate() {
for (int i = 0; i < buffers.size(); i++) {
ReferenceCountUtil.release(buffers.get(i));
buffers.get(i).release();
}

buffers.clear();
Expand Down
Loading
Loading