Skip to content

Commit

Permalink
Fix checksum calculation bug when the payload is a CompositeByteBuf w…
Browse files Browse the repository at this point in the history
…ith readerIndex > 0 (#4196)

* Add a test that reproduces a bug in checksum calculation

* Revert "Fixed unnecessary copy to heap (#2701)" changes to ByteBufList

This partially reverts commit 3c9c710.

* Remove CompositeBuffer unwrapping in DigestManager

* Rename update -> internalUpdate so that unwrapping logic could be added to update

* Remove unnecessary unwrapping logic in Java9IntHash

* Add safe way to handle CompositeByteBuf

* Add license header

* Fix checkstyle

* Refactor ByteBuf visitor solution

* Fix checkstyle

* Reformat

* Refactor recursive visiting

* Revisit equals, hashCode and toString

* Refactor test case

* Add support for UnpooledHeapByteBuf.getBytes which passes an array

* Add support for visiting buffers backed by byte[] arrays

- getBytes calls setBytes with a byte[] argument for
  heap ByteBufs

* Move ByteBufVisitor to org.apache.bookkeeper.util package

* Update javadoc

* Refactor to use stateless visitor so that instance can be shared

* Improve test so that a single scenario can be used for debugging

* Fix bug in Java9IntHash calculation that assumed crc32c_update(x) == ~crc32c_update(~x)

- Java9IntHash uses private methods from java.util.zip.CRC32C class,
  updateBytes and updateDirectByteBuffer.
  When inspecting the use and interface contract, it doesn't match
  how it is used in Java9IntHash. This PR addresses that by introducing
  a separate initial value for initializing the accumulated value
  so that the initial value could match the logic in
  java.util.zip.CRC32C.reset method. There's also a separate
  method for finalizing the accumulated value into a final
  checksum value. This is to match the java.util.zip.CRC32C.getValue
  method's logic (uses bitwise complement operator ~).

- With a quick glance, it might appear that the previous logic is similar.
  However it isn't since I have a failing test which gets fixed with this
  change. I haven't yet added the Java9IntHash level unit test case to prove how
  it differs. It must be related to integer value overflow. For the CRC32C function,
  I believe it means that it cannot be assumed in all cases that
  func(x) == ~func(~x). That's the assumption that the previous code was making.
  It probably applies for many inputs, but not all. It would break in overflow
  cases.

* Fix checkstyle

* Fix checkstyle

* Fix missing depth increment that prevents StackOverflowException

* Properly handle the depth increase and decrease

* Remove unnecessary condition

* Use more efficient way to read bytes to the target array

* Don't use ByteBufVisitor if it's not necessary

* Revert "Fix bug in Java9IntHash calculation that assumed crc32c_update(x) == ~crc32c_update(~x)"

This reverts commit 272e962.

* Fix issue in resume byte[] version that was added

- input and output should be complemented. explanation has been added to the
  resume ByteBuf method

* Polish ByteBufVisitor

- reuse GetBytesCallbackByteBuf instance for handling the root ByteBuf instance

* Use extracted method

* Fix bug with array handling

* Polish ByteBufVisitor

* Optimize the buffer copying in the case where array or memory address cannot be accessed

- read-only buffers will need to be copied before reading
  - use ByteBuf.copy for direct buffers with pooled allocator when the algorithm can accept
    a memory address buffer
- use the 64kB threadlocal byte[] buffer for copying all other inputs

* Check if memory address is accepted

* Improve comments about complement (current = ~current) in resume

* Print thread dump when build is cancelled

* Filter empty buffers and arrays in ByteBufVisitor
  • Loading branch information
lhotari authored Feb 7, 2024
1 parent 2209734 commit 9c373f7
Show file tree
Hide file tree
Showing 17 changed files with 1,618 additions and 132 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/bk-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ jobs:
path: surefire-reports
retention-days: 7

- name: print JVM thread dumps when cancelled
if: cancelled()
run: ./dev/ci-tool print_thread_dumps

integration-tests:
name: Integration Tests
runs-on: ubuntu-latest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,17 @@ void populateValueAndReset(int digest, ByteBuf buf) {
}

@Override
int update(int digest, ByteBuf data, int offset, int len) {
int internalUpdate(int digest, ByteBuf data, int offset, int len) {
return Crc32cIntChecksum.resumeChecksum(digest, data, offset, len);
}

@Override
int internalUpdate(int digest, byte[] buffer, int offset, int len) {
return Crc32cIntChecksum.resumeChecksum(digest, buffer, offset, len);
}

@Override
boolean acceptsMemoryAddressBuffer() {
return Crc32cIntChecksum.acceptsMemoryAddressBuffer();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ interface CRC32Digest {
long getValueAndReset();

void update(ByteBuf buf, int offset, int len);
void update(byte[] buffer, int offset, int len);
}

private static final FastThreadLocal<CRC32Digest> crc = new FastThreadLocal<CRC32Digest>() {
Expand Down Expand Up @@ -62,14 +63,25 @@ void populateValueAndReset(int digest, ByteBuf buf) {
}

@Override
int update(int digest, ByteBuf data, int offset, int len) {
int internalUpdate(int digest, ByteBuf data, int offset, int len) {
crc.get().update(data, offset, len);
return 0;
}

@Override
int internalUpdate(int digest, byte[] buffer, int offset, int len) {
crc.get().update(buffer, offset, len);
return 0;
}

@Override
boolean isInt32Digest() {
// This is stored as 8 bytes
return false;
}

@Override
boolean acceptsMemoryAddressBuffer() {
return DirectMemoryCRC32Digest.isSupported();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.FastThreadLocal;
import java.security.GeneralSecurityException;
Expand All @@ -34,6 +32,7 @@
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.ByteBufVisitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -53,10 +52,25 @@ public abstract class DigestManager {
final long ledgerId;
final boolean useV2Protocol;
private final ByteBufAllocator allocator;
private final DigestUpdaterByteBufVisitorCallback byteBufVisitorCallback;

abstract int getMacCodeLength();

abstract int update(int digest, ByteBuf buffer, int offset, int len);
abstract int internalUpdate(int digest, ByteBuf buffer, int offset, int len);

abstract int internalUpdate(int digest, byte[] buffer, int offset, int len);

final int update(int digest, ByteBuf buffer, int offset, int len) {
if (buffer.hasMemoryAddress() && acceptsMemoryAddressBuffer()) {
return internalUpdate(digest, buffer, offset, len);
} else if (buffer.hasArray()) {
return internalUpdate(digest, buffer.array(), buffer.arrayOffset() + offset, len);
} else {
UpdateContext updateContext = new UpdateContext(digest);
ByteBufVisitor.visitBuffers(buffer, offset, len, byteBufVisitorCallback, updateContext);
return updateContext.digest;
}
}

abstract void populateValueAndReset(int digest, ByteBuf buffer);

Expand All @@ -69,6 +83,7 @@ public DigestManager(long ledgerId, boolean useV2Protocol, ByteBufAllocator allo
this.useV2Protocol = useV2Protocol;
this.macCodeLength = getMacCodeLength();
this.allocator = allocator;
this.byteBufVisitorCallback = new DigestUpdaterByteBufVisitorCallback();
}

public static DigestManager instantiate(long ledgerId, byte[] passwd, DigestType digestType,
Expand Down Expand Up @@ -136,34 +151,19 @@ private ReferenceCounted computeDigestAndPackageForSendingV2(long entryId, long

// Compute checksum over the headers
int digest = update(0, buf, buf.readerIndex(), buf.readableBytes());

// don't unwrap slices
final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() instanceof CompositeByteBuf
? data.unwrap() : data;
ReferenceCountUtil.retain(unwrapped);
ReferenceCountUtil.safeRelease(data);

if (unwrapped instanceof CompositeByteBuf) {
CompositeByteBuf cbb = (CompositeByteBuf) unwrapped;
for (int i = 0; i < cbb.numComponents(); i++) {
ByteBuf b = cbb.component(i);
digest = update(digest, b, b.readerIndex(), b.readableBytes());
}
} else {
digest = update(digest, unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
}
digest = update(digest, data, data.readerIndex(), data.readableBytes());

populateValueAndReset(digest, buf);

// Reset the reader index to the beginning
buf.readerIndex(0);

if (isSmallEntry) {
buf.writeBytes(unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
unwrapped.release();
buf.writeBytes(data, data.readerIndex(), data.readableBytes());
data.release();
return buf;
} else {
return ByteBufList.get(buf, unwrapped);
return ByteBufList.get(buf, data);
}
}

Expand All @@ -176,25 +176,9 @@ private ByteBufList computeDigestAndPackageForSendingV3(long entryId, long lastA
headersBuffer.writeLong(length);

int digest = update(0, headersBuffer, 0, METADATA_LENGTH);

// don't unwrap slices
final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() instanceof CompositeByteBuf
? data.unwrap() : data;
ReferenceCountUtil.retain(unwrapped);
ReferenceCountUtil.release(data);

if (unwrapped instanceof CompositeByteBuf) {
CompositeByteBuf cbb = ((CompositeByteBuf) unwrapped);
for (int i = 0; i < cbb.numComponents(); i++) {
ByteBuf b = cbb.component(i);
digest = update(digest, b, b.readerIndex(), b.readableBytes());
}
} else {
digest = update(digest, unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
}
digest = update(digest, data, data.readerIndex(), data.readableBytes());
populateValueAndReset(digest, headersBuffer);

return ByteBufList.get(headersBuffer, unwrapped);
return ByteBufList.get(headersBuffer, data);
}

/**
Expand Down Expand Up @@ -373,4 +357,34 @@ public RecoveryData verifyDigestAndReturnLastConfirmed(ByteBuf dataReceived) thr
long length = dataReceived.readLong();
return new RecoveryData(lastAddConfirmed, length);
}

private static class UpdateContext {
int digest;

UpdateContext(int digest) {
this.digest = digest;
}
}

private class DigestUpdaterByteBufVisitorCallback implements ByteBufVisitor.ByteBufVisitorCallback<UpdateContext> {

@Override
public void visitBuffer(UpdateContext context, ByteBuf visitBuffer, int visitIndex, int visitLength) {
// recursively visit the sub buffer and update the digest
context.digest = internalUpdate(context.digest, visitBuffer, visitIndex, visitLength);
}

@Override
public void visitArray(UpdateContext context, byte[] visitArray, int visitIndex, int visitLength) {
// update the digest with the array
context.digest = internalUpdate(context.digest, visitArray, visitIndex, visitLength);
}

@Override
public boolean acceptsMemoryAddress(UpdateContext context) {
return DigestManager.this.acceptsMemoryAddressBuffer();
}
}

abstract boolean acceptsMemoryAddressBuffer();
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,32 @@ public void update(ByteBuf buf, int index, int length) {
crcValue = (int) updateByteBuffer.invoke(null, crcValue, buf.memoryAddress(), index, length);
} else if (buf.hasArray()) {
// Use the internal method to update from array based
crcValue = (int) updateBytes.invoke(null, crcValue, buf.array(), buf.arrayOffset() + index, length);
crcValue = updateArray(crcValue, buf.array(), buf.arrayOffset() + index, length);
} else {
// Fallback to data copy if buffer is not contiguous
byte[] b = new byte[length];
buf.getBytes(index, b, 0, length);
crcValue = (int) updateBytes.invoke(null, crcValue, b, 0, b.length);
crcValue = updateArray(crcValue, b, 0, length);
}
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}

private static int updateArray(int crcValue, byte[] buf, int offset, int length)
throws IllegalAccessException, InvocationTargetException {
return (int) updateBytes.invoke(null, crcValue, buf, offset, length);
}

@Override
public void update(byte[] buffer, int offset, int len) {
try {
crcValue = updateArray(crcValue, buffer, offset, len);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}

private static final Method updateByteBuffer;
private static final Method updateBytes;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@ int getMacCodeLength() {
}

@Override
int update(int digest, ByteBuf buffer, int offset, int len) {
int internalUpdate(int digest, ByteBuf buffer, int offset, int len) {
return 0;
}

@Override
int internalUpdate(int digest, byte[] buffer, int offset, int len) {
return 0;
}

Expand All @@ -49,4 +54,9 @@ void populateValueAndReset(int digest, ByteBuf buffer) {}
boolean isInt32Digest() {
return false;
}

@Override
boolean acceptsMemoryAddressBuffer() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,24 @@ void populateValueAndReset(int digest, ByteBuf buffer) {
}

@Override
int update(int digest, ByteBuf data, int offset, int len) {
int internalUpdate(int digest, ByteBuf data, int offset, int len) {
mac.get().update(data.slice(offset, len).nioBuffer());
return 0;
}

@Override
int internalUpdate(int digest, byte[] buffer, int offset, int len) {
mac.get().update(buffer, offset, len);
return 0;
}

@Override
boolean isInt32Digest() {
return false;
}

@Override
boolean acceptsMemoryAddressBuffer() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,9 @@ public long getValueAndReset() {
public void update(ByteBuf buf, int offset, int len) {
crc.update(buf.slice(offset, len).nioBuffer());
}

@Override
public void update(byte[] buffer, int offset, int len) {
crc.update(buffer, offset, len);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -133,43 +132,14 @@ public static ByteBufList get() {
* Append a {@link ByteBuf} at the end of this {@link ByteBufList}.
*/
public void add(ByteBuf buf) {
final ByteBuf unwrapped = buf.unwrap() != null && buf.unwrap() instanceof CompositeByteBuf
? buf.unwrap() : buf;
ReferenceCountUtil.retain(unwrapped);
ReferenceCountUtil.release(buf);

if (unwrapped instanceof CompositeByteBuf) {
((CompositeByteBuf) unwrapped).forEach(b -> {
ReferenceCountUtil.retain(b);
buffers.add(b);
});
ReferenceCountUtil.release(unwrapped);
} else {
buffers.add(unwrapped);
}
buffers.add(buf);
}

/**
* Prepend a {@link ByteBuf} at the beginning of this {@link ByteBufList}.
*/
public void prepend(ByteBuf buf) {
// don't unwrap slices
final ByteBuf unwrapped = buf.unwrap() != null && buf.unwrap() instanceof CompositeByteBuf
? buf.unwrap() : buf;
ReferenceCountUtil.retain(unwrapped);
ReferenceCountUtil.release(buf);

if (unwrapped instanceof CompositeByteBuf) {
CompositeByteBuf composite = (CompositeByteBuf) unwrapped;
for (int i = composite.numComponents() - 1; i >= 0; i--) {
ByteBuf b = composite.component(i);
ReferenceCountUtil.retain(b);
buffers.add(0, b);
}
ReferenceCountUtil.release(unwrapped);
} else {
buffers.add(0, unwrapped);
}
buffers.add(0, buf);
}

/**
Expand Down Expand Up @@ -285,7 +255,7 @@ public ByteBufList retain() {
@Override
protected void deallocate() {
for (int i = 0; i < buffers.size(); i++) {
ReferenceCountUtil.release(buffers.get(i));
buffers.get(i).release();
}

buffers.clear();
Expand Down
Loading

0 comments on commit 9c373f7

Please sign in to comment.