Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix negative position read on BufferedChannel cause data can't read problem. #4021

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -269,14 +269,25 @@ public synchronized int read(ByteBuf dest, long pos, int length) throws IOExcept
length -= bytesToCopy;
// let's read it
} else {
readBufferStartPosition = pos;
int readBytes = 0;
try {
// We don't have it in the buffer, so put necessary data in the buffer
readBufferStartPosition = pos;
readBytes = validateAndGetFileChannel()
.read(readBuffer.internalNioBuffer(0, readCapacity), readBufferStartPosition);

int readBytes = fileChannel.read(readBuffer.internalNioBuffer(0, readCapacity),
readBufferStartPosition);
if (readBytes <= 0) {
readBuffer.writerIndex(readBytes);
} catch (Exception e) {
readBufferStartPosition = Long.MIN_VALUE;
readBuffer.clear();
throw e;
}

if (readBytes < 0) {
readBufferStartPosition = Long.MIN_VALUE;
readBuffer.clear();
throw new IOException("Reading from filechannel returned a non-positive value. Short read.");
}
readBuffer.writerIndex(readBytes);
}
}
return (int) (pos - prevPos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,25 @@ public synchronized int read(ByteBuf dest, long pos, int length) throws IOExcept
// here we reached eof.
break;
} else {
// We don't have it in the buffer, so put necessary data in the buffer
readBufferStartPosition = currentPosition;
int readBytes = 0;
if ((readBytes = validateAndGetFileChannel().read(readBuffer.internalNioBuffer(0, readCapacity),
currentPosition)) <= 0) {
try {
// We don't have it in the buffer, so put necessary data in the buffer
readBufferStartPosition = currentPosition;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can check the pos in advance; if it is negative, throw an exception directly. This pr will invalidate the read buffer cache. Maybe we can reuse the read buffer cache at next time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to method early statement to protect negative pos read.

readBytes = validateAndGetFileChannel()
.read(readBuffer.internalNioBuffer(0, readCapacity), readBufferStartPosition);

readBuffer.writerIndex(readBytes);
} catch (Exception e) {
readBufferStartPosition = Long.MIN_VALUE;
readBuffer.clear();
throw e;
}

if (readBytes < 0) {
readBufferStartPosition = Long.MIN_VALUE;
readBuffer.clear();
throw new IOException("Reading from filechannel returned a non-positive value. Short read.");
}
readBuffer.writerIndex(readBytes);
}
}
return (int) (currentPosition - pos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1052,7 +1052,8 @@ EntryLogMetadata extractEntryLogMetadataFromIndex(long entryLogId) throws IOExce
throw new IOException("Old log file header without ledgers map on entryLogId " + entryLogId);
}

if (header.ledgersMapOffset == 0L) {
// for some unknown reason this offset maybe -1.
if (header.ledgersMapOffset <= 0L) {
// The index was not stored in the log file (possibly because the bookie crashed before flushing it)
throw new IOException("No ledgers map index found on entryLogId " + entryLogId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@

package org.apache.bookkeeper.bookie;

import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.Random;
Expand Down Expand Up @@ -70,6 +72,127 @@ public void testBufferedChannelFlushForceWrite() throws Exception {
testBufferedChannel(5000, 30, 0, true, true);
}

@Test
public void testReadBufferStartPositionWhenBufferedChannalThrowIOException() throws Exception {
File newLogFile = File.createTempFile("test", "log");
newLogFile.deleteOnExit();
FileChannel fileChannel = new RandomAccessFile(newLogFile, "rw").getChannel();

BufferedChannel logChannel = new BufferedChannel(UnpooledByteBufAllocator.DEFAULT, fileChannel,
INTERNAL_BUFFER_WRITE_CAPACITY, INTERNAL_BUFFER_READ_CAPACITY, 0);

ByteBuf data = Unpooled.buffer(1024, 1024);

int totalIntNumber = 1024 / 4;
for (int i = 0; i < totalIntNumber; i++) {
data.writeInt(i);
}

logChannel.write(data);

ByteBuf readDst = Unpooled.buffer(1024, 1024);
try {
logChannel.read(readDst, -1);
} catch (Exception e) {
// do nothing.
}

// should reset readBuffer when IOException throws
Assert.assertEquals(Long.MIN_VALUE, logChannel.readBufferStartPosition);
Assert.assertEquals(0, logChannel.readBuffer.readableBytes());

readDst.clear();

logChannel.read(readDst, 0);

for (int i = 0; i < totalIntNumber; i++) {
Assert.assertEquals(readDst.readInt(), i);
}

BufferedReadChannel logReadChannel = new BufferedReadChannel(fileChannel, INTERNAL_BUFFER_READ_CAPACITY);
readDst.clear();

try {
logReadChannel.read(readDst, -1);
} catch (Exception e) {
// do nothing.
}

// should reset readBuffer when IOException throws
Assert.assertEquals(Long.MIN_VALUE, logReadChannel.readBufferStartPosition);
Assert.assertEquals(0, logReadChannel.readBuffer.readableBytes());

readDst.clear();

logChannel.read(readDst, 0);

for (int i = 0; i < totalIntNumber; i++) {
Assert.assertEquals(readDst.readInt(), i);
}

}

@Test
public void testBufferReadChannelCacheReadPosition() throws IOException {
File newLogFile = File.createTempFile("testLargeRead", "log");
newLogFile.deleteOnExit();
FileChannel fileChannel = new RandomAccessFile(newLogFile, "rw").getChannel();

BufferedChannel writeChannel = new BufferedChannel(UnpooledByteBufAllocator.DEFAULT, fileChannel,
INTERNAL_BUFFER_WRITE_CAPACITY, INTERNAL_BUFFER_READ_CAPACITY, 0);

BufferedReadChannel readChannel = new BufferedReadChannel(fileChannel, INTERNAL_BUFFER_READ_CAPACITY);

// fill 1MB data

ByteBuf buf = Unpooled.directBuffer(1024 * 1024);

int totalIntNumber = 1024 * 1024 / 4;

for (int i = 0; i < totalIntNumber; i++) {
buf.writeInt(i);
}

writeChannel.write(buf);

writeChannel.flushAndForceWrite(false);

buf.clear();

// test on BufferedReadChannel and BufferedChannel

ByteBuf internalReadCapacity = Unpooled.directBuffer(INTERNAL_BUFFER_READ_CAPACITY);

for (BufferedReadChannel channel : Lists.newArrayList(readChannel, writeChannel)) {
internalReadCapacity.clear();
buf.clear();

// trigger first read in `INTERNAL_BUFFER_READ_CAPACITY` bytes
channel.read(internalReadCapacity, 0);

// check if the readPosition is cached.
Assert.assertEquals(0, channel.readBufferStartPosition);

// try to a large read this should read some bytes from readBuffer and some read from fileChannel.
channel.read(buf, 0);

for (int i = 0; i < totalIntNumber; i++) {
Assert.assertEquals(buf.readInt(), i);
}

// check if the readPosition is update and cached.
Assert.assertEquals(1024 * 1024 - INTERNAL_BUFFER_READ_CAPACITY, channel.readBufferStartPosition);
}

buf.release();
internalReadCapacity.release();

writeChannel.clear();
readChannel.clear();

writeChannel.close();
}

public void testBufferedChannel(int byteBufLength, int numOfWrites, int unpersistedBytesBound, boolean flush,
boolean shouldForceWrite) throws Exception {
File newLogFile = File.createTempFile("test", "log");
Expand Down