From caf275e66c0b97fe2aa2764ab5a9e3bedbb78eb0 Mon Sep 17 00:00:00 2001 From: wangjinlong Date: Sat, 8 Jul 2023 23:12:06 +0800 Subject: [PATCH 1/5] fix negative position read on BufferedChannel cause data can't read problem --- .../bookkeeper/bookie/BufferedChannel.java | 21 +++++-- .../bookie/BufferedReadChannel.java | 21 +++++-- .../bookkeeper/bookie/DefaultEntryLogger.java | 3 +- .../bookie/BufferedChannelTest.java | 60 +++++++++++++++++++ 4 files changed, 94 insertions(+), 11 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java index 3197165827a..1979fb80f9a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java @@ -269,14 +269,25 @@ public synchronized int read(ByteBuf dest, long pos, int length) throws IOExcept length -= bytesToCopy; // let's read it } else { - readBufferStartPosition = pos; + int readBytes = 0; + try { + // We don't have it in the buffer, so put necessary data in the buffer + readBufferStartPosition = pos; + readBytes = validateAndGetFileChannel() + .read(readBuffer.internalNioBuffer(0, readCapacity), readBufferStartPosition); - int readBytes = fileChannel.read(readBuffer.internalNioBuffer(0, readCapacity), - readBufferStartPosition); - if (readBytes <= 0) { + readBuffer.writerIndex(readBytes); + } catch (Exception e) { + readBufferStartPosition = Long.MIN_VALUE; + readBuffer.clear(); + throw e; + } + + if (readBytes < 0) { + readBufferStartPosition = Long.MIN_VALUE; + readBuffer.clear(); throw new IOException("Reading from filechannel returned a non-positive value. Short read."); } - readBuffer.writerIndex(readBytes); } } return (int) (pos - prevPos); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java index 22f5a81690d..a3d252e1bbd 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java @@ -86,14 +86,25 @@ public synchronized int read(ByteBuf dest, long pos, int length) throws IOExcept // here we reached eof. break; } else { - // We don't have it in the buffer, so put necessary data in the buffer - readBufferStartPosition = currentPosition; int readBytes = 0; - if ((readBytes = validateAndGetFileChannel().read(readBuffer.internalNioBuffer(0, readCapacity), - currentPosition)) <= 0) { + try { + // We don't have it in the buffer, so put necessary data in the buffer + readBufferStartPosition = pos; + readBytes = validateAndGetFileChannel() + .read(readBuffer.internalNioBuffer(0, readCapacity), readBufferStartPosition); + + readBuffer.writerIndex(readBytes); + } catch (Exception e) { + readBufferStartPosition = Long.MIN_VALUE; + readBuffer.clear(); + throw e; + } + + if (readBytes < 0) { + readBufferStartPosition = Long.MIN_VALUE; + readBuffer.clear(); throw new IOException("Reading from filechannel returned a non-positive value. Short read."); } - readBuffer.writerIndex(readBytes); } } return (int) (currentPosition - pos); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java index 64c6508d67d..448346d8686 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java @@ -1052,7 +1052,8 @@ EntryLogMetadata extractEntryLogMetadataFromIndex(long entryLogId) throws IOExce throw new IOException("Old log file header without ledgers map on entryLogId " + entryLogId); } - if (header.ledgersMapOffset == 0L) { + // for some unknown reason this offset maybe -1. + if (header.ledgersMapOffset <= 0L) { // The index was not stored in the log file (possibly because the bookie crashed before flushing it) throw new IOException("No ledgers map index found on entryLogId " + entryLogId); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java index cd3e34d35e3..ceb4ed338f1 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java @@ -70,6 +70,66 @@ public void testBufferedChannelFlushForceWrite() throws Exception { testBufferedChannel(5000, 30, 0, true, true); } + @Test + public void testReadBufferStartPositionWhenBufferedChannalThrowIOException() throws Exception { + File newLogFile = File.createTempFile("test", "log"); + newLogFile.deleteOnExit(); + FileChannel fileChannel = new RandomAccessFile(newLogFile, "rw").getChannel(); + + BufferedChannel logChannel = new BufferedChannel(UnpooledByteBufAllocator.DEFAULT, fileChannel, + INTERNAL_BUFFER_WRITE_CAPACITY, INTERNAL_BUFFER_READ_CAPACITY, 0); + + ByteBuf data = Unpooled.buffer(1024, 1024); + + int totalIntNumber = 1024 / 4; + for (int i = 0; i < totalIntNumber; i++) { + data.writeInt(i); + } + + logChannel.write(data); + + ByteBuf readDst = Unpooled.buffer(1024, 1024); + try { + logChannel.read(readDst, -1); + } catch (Exception e) { + // do nothing. + } + + // should reset readBuffer when IOException throws + Assert.assertEquals(Long.MIN_VALUE, logChannel.readBufferStartPosition); + Assert.assertEquals(0, logChannel.readBuffer.readableBytes()); + + readDst.clear(); + + logChannel.read(readDst, 0); + + for (int i = 0; i < totalIntNumber; i++) { + Assert.assertEquals(readDst.readInt(), i); + } + + BufferedReadChannel logReadChannel = new BufferedReadChannel(fileChannel, INTERNAL_BUFFER_READ_CAPACITY); + readDst.clear(); + + try { + logReadChannel.read(readDst, -1); + } catch (Exception e) { + // do nothing. + } + + // should reset readBuffer when IOException throws + Assert.assertEquals(Long.MIN_VALUE, logReadChannel.readBufferStartPosition); + Assert.assertEquals(0, logReadChannel.readBuffer.readableBytes()); + + readDst.clear(); + + logChannel.read(readDst, 0); + + for (int i = 0; i < totalIntNumber; i++) { + Assert.assertEquals(readDst.readInt(), i); + } + + } + public void testBufferedChannel(int byteBufLength, int numOfWrites, int unpersistedBytesBound, boolean flush, boolean shouldForceWrite) throws Exception { File newLogFile = File.createTempFile("test", "log"); From 3e2869d1a6bfa4f2f79c12e9a7e154487ba61723 Mon Sep 17 00:00:00 2001 From: WJL3333 Date: Sun, 16 Jul 2023 00:15:17 +0800 Subject: [PATCH 2/5] fix bug --- .../java/org/apache/bookkeeper/bookie/BufferedReadChannel.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java index a3d252e1bbd..59f579e6c4d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java @@ -89,7 +89,7 @@ public synchronized int read(ByteBuf dest, long pos, int length) throws IOExcept int readBytes = 0; try { // We don't have it in the buffer, so put necessary data in the buffer - readBufferStartPosition = pos; + readBufferStartPosition = currentPosition; readBytes = validateAndGetFileChannel() .read(readBuffer.internalNioBuffer(0, readCapacity), readBufferStartPosition); From e0ad8649242153ddcdc636be887778076a511424 Mon Sep 17 00:00:00 2001 From: WJL3333 Date: Sun, 16 Jul 2023 00:34:59 +0800 Subject: [PATCH 3/5] add unit test on BufferedChannel readPosition --- .../bookie/BufferedChannelTest.java | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java index ceb4ed338f1..c4d008979a8 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java @@ -21,10 +21,13 @@ package org.apache.bookkeeper.bookie; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledByteBufAllocator; import java.io.File; +import java.io.IOException; import java.io.RandomAccessFile; import java.nio.channels.FileChannel; import java.util.Random; @@ -130,6 +133,67 @@ public void testReadBufferStartPositionWhenBufferedChannalThrowIOException() thr } + @Test + public void testBufferReadChannelCacheReadPosition() throws IOException { + File newLogFile = File.createTempFile("testLargeRead", "log"); + newLogFile.deleteOnExit(); + FileChannel fileChannel = new RandomAccessFile(newLogFile, "rw").getChannel(); + + BufferedChannel writeChannel = new BufferedChannel(UnpooledByteBufAllocator.DEFAULT, fileChannel, + INTERNAL_BUFFER_WRITE_CAPACITY, INTERNAL_BUFFER_READ_CAPACITY, 0); + + BufferedReadChannel readChannel = new BufferedReadChannel(fileChannel, INTERNAL_BUFFER_READ_CAPACITY); + + // fill 1MB data + + ByteBuf buf = Unpooled.directBuffer(1024 * 1024); + + int totalIntNumber = 1024 * 1024 / 4; + + for (int i = 0; i < totalIntNumber; i++) { + buf.writeInt(i); + } + + writeChannel.write(buf); + + writeChannel.flushAndForceWrite(false); + + buf.clear(); + + // test on BufferedReadChannel and BufferedChannel + + ByteBuf internalReadCapacity = Unpooled.directBuffer(INTERNAL_BUFFER_READ_CAPACITY); + + for (BufferedReadChannel channel : Lists.newArrayList(readChannel, writeChannel)) { + internalReadCapacity.clear(); + buf.clear(); + + // trigger first read in `INTERNAL_BUFFER_READ_CAPACITY` bytes + channel.read(internalReadCapacity, 0); + + // check if the readPosition is cached. + Assert.assertEquals(0, channel.readBufferStartPosition); + + // try to a large read this should read some bytes from readBuffer and some read from fileChannel. + channel.read(buf, 0); + + for (int i = 0; i < totalIntNumber; i++) { + Assert.assertEquals(buf.readInt(), i); + } + + // check if the readPosition is update and cached. + Assert.assertEquals(1024 * 1024 - INTERNAL_BUFFER_READ_CAPACITY, channel.readBufferStartPosition); + } + + buf.release(); + internalReadCapacity.release(); + + writeChannel.clear(); + readChannel.clear(); + + writeChannel.close(); + } + public void testBufferedChannel(int byteBufLength, int numOfWrites, int unpersistedBytesBound, boolean flush, boolean shouldForceWrite) throws Exception { File newLogFile = File.createTempFile("test", "log"); From f3be23bdf0a467f763a25f4161653c38d3a3fbd2 Mon Sep 17 00:00:00 2001 From: WJL3333 Date: Sun, 16 Jul 2023 00:35:51 +0800 Subject: [PATCH 4/5] remove unnessary import --- .../java/org/apache/bookkeeper/bookie/BufferedChannelTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java index c4d008979a8..5112d62e2d7 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java @@ -22,7 +22,6 @@ package org.apache.bookkeeper.bookie; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledByteBufAllocator; From a6f155ad31983f1531548e2816fad2f1bb351574 Mon Sep 17 00:00:00 2001 From: wangjinlong Date: Tue, 25 Jul 2023 12:51:31 +0800 Subject: [PATCH 5/5] add negative pos protection --- .../java/org/apache/bookkeeper/bookie/BufferedChannel.java | 5 +++++ .../org/apache/bookkeeper/bookie/BufferedReadChannel.java | 6 ++++++ 2 files changed, 11 insertions(+) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java index 1979fb80f9a..222e643ed42 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java @@ -243,6 +243,11 @@ public long forceWrite(boolean forceMetadata) throws IOException { @Override public synchronized int read(ByteBuf dest, long pos, int length) throws IOException { + // protect negative position read + if (pos < 0) { + throw new IllegalArgumentException("Negative position pos:" + pos); + } + long prevPos = pos; while (length > 0) { // check if it is in the write buffer diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java index 59f579e6c4d..7511588fecd 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java @@ -72,6 +72,12 @@ public synchronized int read(ByteBuf dest, long pos, int length) throws IOExcept if (pos >= eof) { return -1; } + + // protect negative position read + if (pos < 0) { + throw new IllegalArgumentException("Negative position pos:" + pos); + } + while (length > 0) { // Check if the data is in the buffer, if so, copy it. if (readBufferStartPosition <= currentPosition