diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java index 3197165827a..222e643ed42 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java @@ -243,6 +243,11 @@ public long forceWrite(boolean forceMetadata) throws IOException { @Override public synchronized int read(ByteBuf dest, long pos, int length) throws IOException { + // protect negative position read + if (pos < 0) { + throw new IllegalArgumentException("Negative position pos:" + pos); + } + long prevPos = pos; while (length > 0) { // check if it is in the write buffer @@ -269,14 +274,25 @@ public synchronized int read(ByteBuf dest, long pos, int length) throws IOExcept length -= bytesToCopy; // let's read it } else { - readBufferStartPosition = pos; + int readBytes = 0; + try { + // We don't have it in the buffer, so put necessary data in the buffer + readBufferStartPosition = pos; + readBytes = validateAndGetFileChannel() + .read(readBuffer.internalNioBuffer(0, readCapacity), readBufferStartPosition); + + readBuffer.writerIndex(readBytes); + } catch (Exception e) { + readBufferStartPosition = Long.MIN_VALUE; + readBuffer.clear(); + throw e; + } - int readBytes = fileChannel.read(readBuffer.internalNioBuffer(0, readCapacity), - readBufferStartPosition); - if (readBytes <= 0) { + if (readBytes < 0) { + readBufferStartPosition = Long.MIN_VALUE; + readBuffer.clear(); throw new IOException("Reading from filechannel returned a non-positive value. Short read."); } - readBuffer.writerIndex(readBytes); } } return (int) (pos - prevPos); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java index 22f5a81690d..7511588fecd 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java @@ -72,6 +72,12 @@ public synchronized int read(ByteBuf dest, long pos, int length) throws IOExcept if (pos >= eof) { return -1; } + + // protect negative position read + if (pos < 0) { + throw new IllegalArgumentException("Negative position pos:" + pos); + } + while (length > 0) { // Check if the data is in the buffer, if so, copy it. if (readBufferStartPosition <= currentPosition @@ -86,14 +92,25 @@ public synchronized int read(ByteBuf dest, long pos, int length) throws IOExcept // here we reached eof. break; } else { - // We don't have it in the buffer, so put necessary data in the buffer - readBufferStartPosition = currentPosition; int readBytes = 0; - if ((readBytes = validateAndGetFileChannel().read(readBuffer.internalNioBuffer(0, readCapacity), - currentPosition)) <= 0) { + try { + // We don't have it in the buffer, so put necessary data in the buffer + readBufferStartPosition = currentPosition; + readBytes = validateAndGetFileChannel() + .read(readBuffer.internalNioBuffer(0, readCapacity), readBufferStartPosition); + + readBuffer.writerIndex(readBytes); + } catch (Exception e) { + readBufferStartPosition = Long.MIN_VALUE; + readBuffer.clear(); + throw e; + } + + if (readBytes < 0) { + readBufferStartPosition = Long.MIN_VALUE; + readBuffer.clear(); throw new IOException("Reading from filechannel returned a non-positive value. Short read."); } - readBuffer.writerIndex(readBytes); } } return (int) (currentPosition - pos); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java index 64c6508d67d..448346d8686 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java @@ -1052,7 +1052,8 @@ EntryLogMetadata extractEntryLogMetadataFromIndex(long entryLogId) throws IOExce throw new IOException("Old log file header without ledgers map on entryLogId " + entryLogId); } - if (header.ledgersMapOffset == 0L) { + // for some unknown reason this offset maybe -1. + if (header.ledgersMapOffset <= 0L) { // The index was not stored in the log file (possibly because the bookie crashed before flushing it) throw new IOException("No ledgers map index found on entryLogId " + entryLogId); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java index cd3e34d35e3..5112d62e2d7 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java @@ -21,10 +21,12 @@ package org.apache.bookkeeper.bookie; +import com.google.common.collect.Lists; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledByteBufAllocator; import java.io.File; +import java.io.IOException; import java.io.RandomAccessFile; import java.nio.channels.FileChannel; import java.util.Random; @@ -70,6 +72,127 @@ public void testBufferedChannelFlushForceWrite() throws Exception { testBufferedChannel(5000, 30, 0, true, true); } + @Test + public void testReadBufferStartPositionWhenBufferedChannalThrowIOException() throws Exception { + File newLogFile = File.createTempFile("test", "log"); + newLogFile.deleteOnExit(); + FileChannel fileChannel = new RandomAccessFile(newLogFile, "rw").getChannel(); + + BufferedChannel logChannel = new BufferedChannel(UnpooledByteBufAllocator.DEFAULT, fileChannel, + INTERNAL_BUFFER_WRITE_CAPACITY, INTERNAL_BUFFER_READ_CAPACITY, 0); + + ByteBuf data = Unpooled.buffer(1024, 1024); + + int totalIntNumber = 1024 / 4; + for (int i = 0; i < totalIntNumber; i++) { + data.writeInt(i); + } + + logChannel.write(data); + + ByteBuf readDst = Unpooled.buffer(1024, 1024); + try { + logChannel.read(readDst, -1); + } catch (Exception e) { + // do nothing. + } + + // should reset readBuffer when IOException throws + Assert.assertEquals(Long.MIN_VALUE, logChannel.readBufferStartPosition); + Assert.assertEquals(0, logChannel.readBuffer.readableBytes()); + + readDst.clear(); + + logChannel.read(readDst, 0); + + for (int i = 0; i < totalIntNumber; i++) { + Assert.assertEquals(readDst.readInt(), i); + } + + BufferedReadChannel logReadChannel = new BufferedReadChannel(fileChannel, INTERNAL_BUFFER_READ_CAPACITY); + readDst.clear(); + + try { + logReadChannel.read(readDst, -1); + } catch (Exception e) { + // do nothing. + } + + // should reset readBuffer when IOException throws + Assert.assertEquals(Long.MIN_VALUE, logReadChannel.readBufferStartPosition); + Assert.assertEquals(0, logReadChannel.readBuffer.readableBytes()); + + readDst.clear(); + + logChannel.read(readDst, 0); + + for (int i = 0; i < totalIntNumber; i++) { + Assert.assertEquals(readDst.readInt(), i); + } + + } + + @Test + public void testBufferReadChannelCacheReadPosition() throws IOException { + File newLogFile = File.createTempFile("testLargeRead", "log"); + newLogFile.deleteOnExit(); + FileChannel fileChannel = new RandomAccessFile(newLogFile, "rw").getChannel(); + + BufferedChannel writeChannel = new BufferedChannel(UnpooledByteBufAllocator.DEFAULT, fileChannel, + INTERNAL_BUFFER_WRITE_CAPACITY, INTERNAL_BUFFER_READ_CAPACITY, 0); + + BufferedReadChannel readChannel = new BufferedReadChannel(fileChannel, INTERNAL_BUFFER_READ_CAPACITY); + + // fill 1MB data + + ByteBuf buf = Unpooled.directBuffer(1024 * 1024); + + int totalIntNumber = 1024 * 1024 / 4; + + for (int i = 0; i < totalIntNumber; i++) { + buf.writeInt(i); + } + + writeChannel.write(buf); + + writeChannel.flushAndForceWrite(false); + + buf.clear(); + + // test on BufferedReadChannel and BufferedChannel + + ByteBuf internalReadCapacity = Unpooled.directBuffer(INTERNAL_BUFFER_READ_CAPACITY); + + for (BufferedReadChannel channel : Lists.newArrayList(readChannel, writeChannel)) { + internalReadCapacity.clear(); + buf.clear(); + + // trigger first read in `INTERNAL_BUFFER_READ_CAPACITY` bytes + channel.read(internalReadCapacity, 0); + + // check if the readPosition is cached. + Assert.assertEquals(0, channel.readBufferStartPosition); + + // try to a large read this should read some bytes from readBuffer and some read from fileChannel. + channel.read(buf, 0); + + for (int i = 0; i < totalIntNumber; i++) { + Assert.assertEquals(buf.readInt(), i); + } + + // check if the readPosition is update and cached. + Assert.assertEquals(1024 * 1024 - INTERNAL_BUFFER_READ_CAPACITY, channel.readBufferStartPosition); + } + + buf.release(); + internalReadCapacity.release(); + + writeChannel.clear(); + readChannel.clear(); + + writeChannel.close(); + } + public void testBufferedChannel(int byteBufLength, int numOfWrites, int unpersistedBytesBound, boolean flush, boolean shouldForceWrite) throws Exception { File newLogFile = File.createTempFile("test", "log");