Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix checksum calculation bug when the payload is a CompositeByteBuf with readerIndex > 0 #4196

Merged
merged 39 commits into from
Feb 7, 2024
Merged
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
9fb47f5
Add a test that reproduces a bug in checksum calculation
lhotari Jan 31, 2024
0707710
Revert "Fixed unnecessary copy to heap (#2701)" changes to ByteBufList
lhotari Jan 30, 2024
fe9301d
Remove CompositeBuffer unwrapping in DigestManager
lhotari Jan 30, 2024
532de4f
Rename update -> internalUpdate so that unwrapping logic could be add…
lhotari Jan 30, 2024
9a39d6b
Remove unnecessary unwrapping logic in Java9IntHash
lhotari Jan 30, 2024
986942d
Add safe way to handle CompositeByteBuf
lhotari Jan 31, 2024
0669a66
Add license header
lhotari Jan 31, 2024
4036a3a
Fix checkstyle
lhotari Jan 31, 2024
7a8c200
Refactor ByteBuf visitor solution
lhotari Jan 31, 2024
b5b42ee
Fix checkstyle
lhotari Jan 31, 2024
81e9d2d
Reformat
lhotari Jan 31, 2024
6c7e6e0
Refactor recursive visiting
lhotari Jan 31, 2024
c19ae5d
Revisit equals, hashCode and toString
lhotari Jan 31, 2024
8928f98
Refactor test case
lhotari Feb 1, 2024
19e33ca
Add support for UnpooledHeapByteBuf.getBytes which passes an array
lhotari Feb 1, 2024
86d002d
Add support for visiting buffers backed by byte[] arrays
lhotari Feb 1, 2024
e793872
Move ByteBufVisitor to org.apache.bookkeeper.util package
lhotari Feb 1, 2024
6508bf5
Update javadoc
lhotari Feb 1, 2024
5cd2dd8
Refactor to use stateless visitor so that instance can be shared
lhotari Feb 1, 2024
fc93b5e
Improve test so that a single scenario can be used for debugging
lhotari Feb 2, 2024
272e962
Fix bug in Java9IntHash calculation that assumed crc32c_update(x) == …
lhotari Feb 2, 2024
b4d0b18
Fix checkstyle
lhotari Feb 2, 2024
9c1b68f
Fix checkstyle
lhotari Feb 2, 2024
a751294
Fix missing depth increment that prevents StackOverflowException
lhotari Feb 3, 2024
5f8fef5
Properly handle the depth increase and decrease
lhotari Feb 3, 2024
bb703ab
Remove unnecessary condition
lhotari Feb 3, 2024
2b79716
Use more efficient way to read bytes to the target array
lhotari Feb 3, 2024
f650b9b
Don't use ByteBufVisitor if it's not necessary
lhotari Feb 3, 2024
d21a5cd
Revert "Fix bug in Java9IntHash calculation that assumed crc32c_updat…
lhotari Feb 3, 2024
a9ed5e4
Fix issue in resume byte[] version that was added
lhotari Feb 3, 2024
b7d5795
Polish ByteBufVisitor
lhotari Feb 4, 2024
34c1c8a
Use extracted method
lhotari Feb 4, 2024
be20948
Fix bug with array handling
lhotari Feb 4, 2024
7998659
Polish ByteBufVisitor
lhotari Feb 4, 2024
b691316
Optimize the buffer copying in the case where array or memory address…
lhotari Feb 4, 2024
0af1556
Check if memory address is accepted
lhotari Feb 4, 2024
317aab7
Improve comments about complement (current = ~current) in resume
lhotari Feb 4, 2024
2c73dca
Print thread dump when build is cancelled
lhotari Feb 1, 2024
c215db8
Filter empty buffers and arrays in ByteBufVisitor
lhotari Feb 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add a test that reproduces a bug in checksum calculation
lhotari committed Jan 31, 2024
commit 9fb47f5dcf3848320991c7a2cf602eb325893a42
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package org.apache.bookkeeper.proto.checksum;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import com.scurrilous.circe.checksum.IntHash;
import com.scurrilous.circe.checksum.Java8IntHash;
import com.scurrilous.circe.checksum.Java9IntHash;
import com.scurrilous.circe.checksum.JniIntHash;
import com.scurrilous.circe.crc.Sse42Crc32C;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCounted;
import java.util.Arrays;
import java.util.Collection;
import org.apache.bookkeeper.proto.BookieProtoEncoding;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.commons.lang3.RandomUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

/**
* Reproduces a bug in the checksum calculation when the payload is
* a CompositeByteBuf and this buffer has a reader index state other than 0.
* The reader index state gets lost in the unwrapping process.
*
* There are at least 2 different bugs. One that occurs when the
* payload is >= BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD and the other when
* it's < BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD.
* This test covers both useV2Protocol=true and useV2Protocol=false since the bug is triggered differently.
*/
@RunWith(Parameterized.class)
public class CompositeByteBufUnwrapBugReproduceTest {
final byte[] testPayLoad;
final int bufferPrefixLength;
private final boolean useV2Protocol;

@Parameterized.Parameters
public static Collection<Object[]> testScenarios() {
return Arrays.asList(new Object[][] {
{BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD - 1, true},
{BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD - 1, false},
{BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD, true},
{BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD, false}
});
}

public CompositeByteBufUnwrapBugReproduceTest(int payloadSize, boolean useV2Protocol) {
this.testPayLoad = createTestPayLoad(payloadSize);
this.bufferPrefixLength = payloadSize / 7;
this.useV2Protocol = useV2Protocol;
}

private static byte[] createTestPayLoad(int payloadSize) {
byte[] payload = new byte[payloadSize];
for (int i = 0; i < payloadSize; i++) {
payload[i] = (byte) i;
}
return payload;
}


/**
* A DigestManager that uses the given IntHash implementation for testing.
*/
static class TestIntHashDigestManager extends DigestManager {
private final IntHash intHash;

public TestIntHashDigestManager(IntHash intHash, long ledgerId, boolean useV2Protocol, ByteBufAllocator allocator) {
super(ledgerId, useV2Protocol, allocator);
this.intHash = intHash;
}

@Override
int getMacCodeLength() {
return 4;
}

@Override
boolean isInt32Digest() {
return true;
}

@Override
void populateValueAndReset(int digest, ByteBuf buf) {
buf.writeInt(digest);
}

@Override
int update(int digest, ByteBuf data, int offset, int len) {
return intHash.resume(digest, data, offset, len);
}
}

@Test
public void shouldCalculateChecksumForCompositeBuffer() {
ByteBuf testPayload = Unpooled.wrappedBuffer(testPayLoad);
byte[] referenceOutput = computeDigestAndPackageForSending(new Java8IntHash(), testPayload.retainedDuplicate());
assertDigestAndPackageMatchesReference(new Java8IntHash(), testPayload, referenceOutput);
assertDigestAndPackageMatchesReference(new Java9IntHash(), testPayload, referenceOutput);
if (Sse42Crc32C.isSupported()) {
assertDigestAndPackageMatchesReference(new JniIntHash(), testPayload, referenceOutput);
}
testPayload.release();
}

private void assertDigestAndPackageMatchesReference(IntHash intHash, ByteBuf payload, byte[] referenceOutput) {
byte[] output1 = computeDigestAndPackageForSending(intHash, payload.retainedDuplicate());
assertArrayEquals(referenceOutput, output1);

// wrap the payload in a CompositeByteBuf which contains another buffer where the payload contains a prefix
// the reader index is set past the prefix
ByteBuf payload2 = wrapWithPrefixAndCompositeByteBufWithReaderIndexState(payload.retainedDuplicate());
// this validates that the readable bytes in payload2 match the TEST_PAYLOAD content
assertArrayEquals(ByteBufUtil.getBytes(payload2.duplicate()), testPayLoad);

byte[] output2 = computeDigestAndPackageForSending(intHash, payload2);
assertArrayEquals(output1, output2);
}

private byte[] computeDigestAndPackageForSending(IntHash intHash, ByteBuf data) {
DigestManager digestManager = new TestIntHashDigestManager(intHash, 1, useV2Protocol, ByteBufAllocator.DEFAULT);
ReferenceCounted packagedBuffer =
digestManager.computeDigestAndPackageForSending(1, 0, data.readableBytes(), data,
MacDigestManager.EMPTY_LEDGER_KEY, BookieProtocol.FLAG_NONE);
return packagedBufferToBytes(packagedBuffer);
}

ByteBuf wrapWithPrefixAndCompositeByteBufWithReaderIndexState(ByteBuf payload) {
// create a new buffer with a prefix and the actual payload
ByteBuf prefixedPayload = ByteBufAllocator.DEFAULT.buffer(bufferPrefixLength + payload.readableBytes());
prefixedPayload.writeBytes(RandomUtils.nextBytes(bufferPrefixLength));
prefixedPayload.writeBytes(payload);

// wrap the buffer in a composite buffer
CompositeByteBuf outerComposite = ByteBufAllocator.DEFAULT.compositeBuffer();
outerComposite.addComponent(true, prefixedPayload);

// set reader index state. this is the state that gets lost in the unwrapping process
outerComposite.readerIndex(bufferPrefixLength);

return outerComposite;
}

private static byte[] packagedBufferToBytes(ReferenceCounted packagedBuffer) {
byte[] output;
if (packagedBuffer instanceof ByteBufList) {
ByteBufList bufList = (ByteBufList) packagedBuffer;
output = new byte[bufList.readableBytes()];
bufList.getBytes(output);
for (int i = 0; i < bufList.size(); i++) {
bufList.getBuffer(i).release();
}
} else if (packagedBuffer instanceof ByteBuf) {
output = ByteBufUtil.getBytes((ByteBuf) packagedBuffer);
packagedBuffer.release();
} else {
throw new RuntimeException("Unexpected type: " + packagedBuffer.getClass());
}
return output;
}
}