Skip to content

Commit

Permalink
[BP-62] Bookie server introduces the BatchedReadEntryProcessor to han…
Browse files Browse the repository at this point in the history
…dle batch read request (#4187)

Descriptions of the changes in this PR:
This is the second PR for the batch read(#4051) feature.

Bookie server introduces the BatchedReadEntryProcessor to handle batch read request
  • Loading branch information
horizonzy authored Jan 24, 2024
1 parent 92e41c1 commit 0b0504f
Show file tree
Hide file tree
Showing 7 changed files with 399 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,9 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati

protected static final String SKIP_REPLAY_JOURNAL_INVALID_RECORD = "skipReplayJournalInvalidRecord";

protected static final String MAX_BATCH_READ_SIZE = "maxBatchReadSize";
protected static final int DEFAULT_MAX_BATCH_READ_SIZE = 5 * 1024 * 1024; // 5MB

/**
* Construct a default configuration object.
*/
Expand Down Expand Up @@ -4126,4 +4129,24 @@ public ServerConfiguration setOperationMaxNumbersInSingleRocksDBWriteBatch(int m
public int getMaxOperationNumbersInSingleRocksDBBatch() {
return getInt(MAX_OPERATION_NUMBERS_IN_SINGLE_ROCKSDB_WRITE_BATCH, 100000);
}

/**
* Set the max batch read size.
*
* @param maxBatchReadSize
* @return
*/
public ServerConfiguration setMaxBatchReadSize(long maxBatchReadSize) {
this.setProperty(MAX_BATCH_READ_SIZE, maxBatchReadSize);
return this;
}

/**
* Get the max batch read size.
*
* @return
*/
public long getMaxBatchReadSize() {
return this.getLong(MAX_BATCH_READ_SIZE, DEFAULT_MAX_BATCH_READ_SIZE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.proto;

import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCounted;
import java.util.concurrent.ExecutorService;
import org.apache.bookkeeper.proto.BookieProtocol.BatchedReadRequest;
import org.apache.bookkeeper.util.ByteBufList;

public class BatchedReadEntryProcessor extends ReadEntryProcessor {

private long maxBatchReadSize;

public static BatchedReadEntryProcessor create(BatchedReadRequest request,
BookieRequestHandler requestHandler,
BookieRequestProcessor requestProcessor,
ExecutorService fenceThreadPool,
boolean throttleReadResponses,
long maxBatchReadSize) {
BatchedReadEntryProcessor rep = RECYCLER.get();
rep.init(request, requestHandler, requestProcessor);
rep.fenceThreadPool = fenceThreadPool;
rep.throttleReadResponses = throttleReadResponses;
rep.maxBatchReadSize = maxBatchReadSize;
requestProcessor.onReadRequestStart(requestHandler.ctx().channel());
return rep;
}

@Override
protected ReferenceCounted readData() throws Exception {
ByteBufList data = null;
BatchedReadRequest batchRequest = (BatchedReadRequest) request;
int maxCount = batchRequest.getMaxCount();
if (maxCount <= 0) {
maxCount = Integer.MAX_VALUE;
}
long maxSize = Math.min(batchRequest.getMaxSize(), maxBatchReadSize);
//See BookieProtoEncoding.ResponseEnDeCoderPreV3#encode on BatchedReadResponse case.
long frameSize = 24 + 8 + 4;
for (int i = 0; i < maxCount; i++) {
try {
ByteBuf entry = requestProcessor.getBookie().readEntry(request.getLedgerId(), request.getEntryId() + i);
frameSize += entry.readableBytes() + 4;
if (data == null) {
data = ByteBufList.get(entry);
} else {
if (frameSize > maxSize) {
entry.release();
break;
}
data.add(entry);
}
} catch (Throwable e) {
if (data == null) {
throw e;
}
break;
}
}
return data;
}

@Override
protected BookieProtocol.Response buildReadResponse(ReferenceCounted data) {
return ResponseBuilder.buildBatchedReadResponse((ByteBufList) data, (BatchedReadRequest) request);
}

protected void recycle() {
request.recycle();
super.reset();
if (this.recyclerHandle != null) {
this.recyclerHandle.recycle(this);
}
}

private final Recycler.Handle<BatchedReadEntryProcessor> recyclerHandle;

private BatchedReadEntryProcessor(Recycler.Handle<BatchedReadEntryProcessor> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}

private static final Recycler<BatchedReadEntryProcessor> RECYCLER = new Recycler<BatchedReadEntryProcessor>() {
@Override
protected BatchedReadEntryProcessor newObject(Recycler.Handle<BatchedReadEntryProcessor> handle) {
return new BatchedReadEntryProcessor(handle);
}
};

}
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ boolean isFencing() {

private final Handle<ReadRequest> recyclerHandle;

private ReadRequest() {
protected ReadRequest() {
recyclerHandle = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,8 +677,11 @@ private void processAddRequest(final BookieProtocol.ParsedAddRequest r, final Bo
private void processReadRequest(final BookieProtocol.ReadRequest r, final BookieRequestHandler requestHandler) {
ExecutorService fenceThreadPool =
null == highPriorityThreadPool ? null : highPriorityThreadPool.chooseThread(requestHandler.ctx());
ReadEntryProcessor read = ReadEntryProcessor.create(r, requestHandler,
this, fenceThreadPool, throttleReadResponses);
ReadEntryProcessor read = r instanceof BookieProtocol.BatchedReadRequest
? BatchedReadEntryProcessor.create((BookieProtocol.BatchedReadRequest) r, requestHandler,
this, fenceThreadPool, throttleReadResponses, serverCfg.getMaxBatchReadSize())
: ReadEntryProcessor.create(r, requestHandler,
this, fenceThreadPool, throttleReadResponses);

// If it's a high priority read (fencing or as part of recovery process), we want to make sure it
// gets executed as fast as possible, so bypass the normal readThreadPool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand All @@ -39,8 +40,8 @@
class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> {
private static final Logger LOG = LoggerFactory.getLogger(ReadEntryProcessor.class);

private ExecutorService fenceThreadPool;
private boolean throttleReadResponses;
protected ExecutorService fenceThreadPool;
protected boolean throttleReadResponses;

public static ReadEntryProcessor create(ReadRequest request,
BookieRequestHandler requestHandler,
Expand Down Expand Up @@ -70,7 +71,7 @@ protected void processPacket() {
}
int errorCode = BookieProtocol.EOK;
long startTimeNanos = MathUtils.nowInNano();
ByteBuf data = null;
ReferenceCounted data = null;
try {
CompletableFuture<Boolean> fenceResult = null;
if (request.isFencing()) {
Expand All @@ -85,9 +86,9 @@ protected void processPacket() {
throw BookieException.create(BookieException.Code.UnauthorizedAccessException);
}
}
data = requestProcessor.getBookie().readEntry(request.getLedgerId(), request.getEntryId());
data = readData();
if (LOG.isDebugEnabled()) {
LOG.debug("##### Read entry ##### {} -- ref-count: {}", data.readableBytes(), data.refCnt());
LOG.debug("##### Read entry ##### -- ref-count: {}", data.refCnt());
}
if (fenceResult != null) {
handleReadResultForFenceRead(fenceResult, data, startTimeNanos);
Expand Down Expand Up @@ -126,13 +127,17 @@ protected void processPacket() {
sendResponse(data, errorCode, startTimeNanos);
}

private void sendResponse(ByteBuf data, int errorCode, long startTimeNanos) {
protected ReferenceCounted readData() throws Exception {
return requestProcessor.getBookie().readEntry(request.getLedgerId(), request.getEntryId());
}

private void sendResponse(ReferenceCounted data, int errorCode, long startTimeNanos) {
final RequestStats stats = requestProcessor.getRequestStats();
final OpStatsLogger logger = stats.getReadEntryStats();
BookieProtocol.Response response;
if (errorCode == BookieProtocol.EOK) {
logger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
response = ResponseBuilder.buildReadResponse(data, request);
response = buildReadResponse(data);
} else {
if (data != null) {
ReferenceCountUtil.release(data);
Expand All @@ -145,13 +150,17 @@ private void sendResponse(ByteBuf data, int errorCode, long startTimeNanos) {
recycle();
}

private void sendFenceResponse(Boolean result, ByteBuf data, long startTimeNanos) {
protected BookieProtocol.Response buildReadResponse(ReferenceCounted data) {
return ResponseBuilder.buildReadResponse((ByteBuf) data, request);
}

private void sendFenceResponse(Boolean result, ReferenceCounted data, long startTimeNanos) {
final int retCode = result != null && result ? BookieProtocol.EOK : BookieProtocol.EIO;
sendResponse(data, retCode, startTimeNanos);
}

private void handleReadResultForFenceRead(CompletableFuture<Boolean> fenceResult,
ByteBuf data,
ReferenceCounted data,
long startTimeNanos) {
if (null != fenceThreadPool) {
fenceResult.whenCompleteAsync(new FutureEventListener<Boolean>() {
Expand Down Expand Up @@ -192,7 +201,9 @@ public String toString() {
void recycle() {
request.recycle();
super.reset();
this.recyclerHandle.recycle(this);
if (this.recyclerHandle != null) {
this.recyclerHandle.recycle(this);
}
}

private final Recycler.Handle<ReadEntryProcessor> recyclerHandle;
Expand All @@ -201,6 +212,10 @@ private ReadEntryProcessor(Recycler.Handle<ReadEntryProcessor> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}

protected ReadEntryProcessor() {
this.recyclerHandle = null;
}

private static final Recycler<ReadEntryProcessor> RECYCLER = new Recycler<ReadEntryProcessor>() {
@Override
protected ReadEntryProcessor newObject(Recycler.Handle<ReadEntryProcessor> handle) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,21 @@
package org.apache.bookkeeper.proto;

import io.netty.buffer.ByteBuf;
import org.apache.bookkeeper.util.ByteBufList;

class ResponseBuilder {
static BookieProtocol.Response buildErrorResponse(int errorCode, BookieProtocol.Request r) {
if (r.getOpCode() == BookieProtocol.ADDENTRY) {
return BookieProtocol.AddResponse.create(r.getProtocolVersion(), errorCode,
r.getLedgerId(), r.getEntryId());
} else {
assert(r.getOpCode() == BookieProtocol.READENTRY);
} else if (r.getOpCode() == BookieProtocol.READENTRY) {
return new BookieProtocol.ReadResponse(r.getProtocolVersion(), errorCode,
r.getLedgerId(), r.getEntryId());
} else {
assert(r.getOpCode() == BookieProtocol.BATCH_READ_ENTRY);
return new BookieProtocol.BatchedReadResponse(r.getProtocolVersion(), errorCode,
r.getLedgerId(), r.getEntryId(), ((BookieProtocol.BatchedReadRequest) r).getRequestId(),
null);
}
}

Expand All @@ -43,4 +48,9 @@ static BookieProtocol.Response buildReadResponse(ByteBuf data, BookieProtocol.Re
return new BookieProtocol.ReadResponse(r.getProtocolVersion(), BookieProtocol.EOK,
r.getLedgerId(), r.getEntryId(), data);
}

static BookieProtocol.Response buildBatchedReadResponse(ByteBufList data, BookieProtocol.BatchedReadRequest r) {
return new BookieProtocol.BatchedReadResponse(r.getProtocolVersion(), BookieProtocol.EOK,
r.getLedgerId(), r.getEntryId(), r.getRequestId(), data);
}
}
Loading

0 comments on commit 0b0504f

Please sign in to comment.