Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add back 24.05 response sending path to fix performance #381

Merged
merged 14 commits into from
Oct 8, 2024
2 changes: 1 addition & 1 deletion src/infer_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ InferRequest::Exec(const bool is_decoupled)
{
bi::scoped_lock<bi::interprocess_mutex> lock{
*(ipc_message->ResponseMutex())};
stub->SendIPCMessage(ipc_message);
stub->SendIPCUtilsMessage(ipc_message);
ipc_message->ResponseCondition()->wait(lock);
}

Expand Down
1 change: 1 addition & 0 deletions src/infer_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class InferRequest {
InferenceTrace& GetTrace();
uint32_t ReleaseFlags();
void SetReleaseFlags(const uint32_t& flags);
intptr_t GetResponseFactoryAddress() { return response_factory_address_; }

#ifdef TRITON_PB_STUB
std::shared_ptr<InferResponse> Exec(const bool is_decoupled);
Expand Down
23 changes: 23 additions & 0 deletions src/ipc_message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,21 @@ IPCMessage::Create(
new IPCMessage(ipc_message_shm, response_mutex_shm, response_cond_shm));
}

std::unique_ptr<IPCMessage>
IPCMessage::Create(
IPCMessageShm* ipc_message_shm,
bi::managed_external_buffer::handle_t& message_handle)
{
return std::unique_ptr<IPCMessage>(
new IPCMessage(ipc_message_shm, message_handle));
}

AllocatedSharedMemory<IPCMessageShm>&
IPCMessage::GetAllocatedSharedMemory()
{
return ipc_message_shm_;
}

std::unique_ptr<IPCMessage>
IPCMessage::LoadFromSharedMemory(
std::unique_ptr<SharedMemoryManager>& shm_pool,
Expand Down Expand Up @@ -133,4 +148,12 @@ IPCMessage::IPCMessage(
ipc_message_handle_ = ipc_message_shm_.handle_;
}

IPCMessage::IPCMessage(
IPCMessageShm* ipc_message_shm,
bi::managed_external_buffer::handle_t& handle)
{
ipc_message_handle_ = handle;
ipc_message_shm_ptr_ = ipc_message_shm;
}

}}}; // namespace triton::backend::python
9 changes: 9 additions & 0 deletions src/ipc_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ class IPCMessage {
static std::unique_ptr<IPCMessage> Create(
const std::unique_ptr<SharedMemoryManager>& shm_pool,
bool inline_response);

static std::unique_ptr<IPCMessage> Create(
IPCMessageShm* ipc_message_shm,
bi::managed_external_buffer::handle_t& message_handle);
static std::unique_ptr<IPCMessage> LoadFromSharedMemory(
std::unique_ptr<SharedMemoryManager>& shm_pool,
bi::managed_external_buffer::handle_t message_handle);
Expand All @@ -108,6 +112,7 @@ class IPCMessage {
bi::interprocess_mutex* ResponseMutex();
bi::managed_external_buffer::handle_t& Args();
bi::managed_external_buffer::handle_t ShmHandle();
AllocatedSharedMemory<IPCMessageShm>& GetAllocatedSharedMemory();

private:
AllocatedSharedMemory<IPCMessageShm> ipc_message_shm_;
Expand All @@ -129,6 +134,10 @@ class IPCMessage {
AllocatedSharedMemory<IPCMessageShm>& ipc_message_shm,
AllocatedSharedMemory<bi::interprocess_mutex>& response_mutex_shm,
AllocatedSharedMemory<bi::interprocess_condition>& response_cond_shm);

IPCMessage(
IPCMessageShm* ipc_message_shm,
bi::managed_external_buffer::handle_t& handle);
};

}}}; // namespace triton::backend::python
144 changes: 117 additions & 27 deletions src/pb_stub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -653,27 +653,20 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
{
py::list py_request_list =
LoadRequestsFromSharedMemory(request_batch_shm_ptr);
std::unique_ptr<IPCMessage> execute_response =
IPCMessage::Create(shm_pool_, false /* Inline response */);
execute_response->Command() = PYTHONSTUB_ExecuteResponse;
std::unique_ptr<IPCMessage> execute_response;

AllocatedSharedMemory<ResponseBatch> response_batch =
shm_pool_->Construct<ResponseBatch>();
ResponseBatch* response_batch_shm_ptr =
reinterpret_cast<ResponseBatch*>(response_batch.data_.get());
execute_response->Args() = response_batch.handle_;
std::optional<AllocatedSharedMemory<char>> response_batch;
bool has_exception = false;
std::string error_string;
std::unique_ptr<PbString> error_string_shm;
std::string err_message;

ScopedDefer execute_finalize([this] { stub_message_queue_->Pop(); });
ScopedDefer _(
[this, &execute_response] { SendIPCMessage(execute_response); });

py::object execute_return;
py::object coroutine_return;
try {
response_batch_shm_ptr->has_error = false;
response_batch_shm_ptr->is_error_set = false;

if (!py::hasattr(model_instance_, "execute")) {
std::string message = "Python model " + model_context_.PythonModelPath() +
" does not implement `execute` method.";
Expand All @@ -683,8 +676,7 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
{
NVTX_RANGE(nvtx_, "PyExecute " + name_);

py::object execute_return =
model_instance_.attr("execute")(py_request_list);
execute_return = model_instance_.attr("execute")(py_request_list);

bool is_coroutine = py::module::import("asyncio")
.attr("iscoroutine")(execute_return)
Expand All @@ -694,12 +686,14 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
// Do not wait for async decoupled execute to return.
RunCoroutine(execute_return, true /* in_background */);
} else {
py::object coroutine_return =
coroutine_return =
RunCoroutine(execute_return, false /* in_background */);
ProcessReturnedResponses(py_request_list, coroutine_return);
ProcessReturnedResponses(
py_request_list, coroutine_return, response_batch);
}
} else {
ProcessReturnedResponses(py_request_list, execute_return);
ProcessReturnedResponses(
py_request_list, execute_return, response_batch);
}
}
}
Expand All @@ -713,16 +707,34 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
}

if (has_exception) {
std::string err_message =
std::string(
"Failed to process the request(s) for model '" + name_ +
"', message: ") +
error_string;
err_message = std::string(
"Failed to process the request(s) for model '" + name_ +
"', message: ") +
error_string;
LOG_ERROR << err_message.c_str();
if (!response_batch) {
response_batch = shm_pool_->Construct<char>(
sizeof(ResponseBatch) + sizeof(IPCMessageShm));
}
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(
response_batch.value().data_.get() + sizeof(IPCMessageShm));

// The backend will clean up the response factory if there is an error in
// the response batch. It is necessary to handle cases where the response
// sender should have already cleaned up, ensuring the backend does not
// delete the response factory again during error handling.
for (py::handle py_request : py_request_list) {
InferRequest* request = py_request.cast<InferRequest*>();
if (request->GetResponseSender()->IsClosed()) {
response_batch_shm_ptr->is_response_factory_deleted = true;
}
}

response_batch_shm_ptr->has_error = true;
error_string_shm = PbString::Create(shm_pool_, err_message);
response_batch_shm_ptr->error = error_string_shm->ShmHandle();
response_batch_shm_ptr->is_error_set = true;
response_batch_shm_ptr->batch_size = 0;
// Once the error is sent to the backend, the backend is supposed to close
// all response factories if not already closed, so closing all response
// senders if not already closed to prevent the model from sending more
Expand All @@ -731,12 +743,47 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
InferRequest* request = py_request.cast<InferRequest*>();
request->GetResponseSender()->Close();
}
} else {
if (!response_batch) {
response_batch = shm_pool_->Construct<char>(
sizeof(ResponseBatch) + sizeof(IPCMessageShm));
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(
response_batch.value().data_.get() + sizeof(IPCMessageShm));
response_batch_shm_ptr->batch_size = 0;
}
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(
response_batch.value().data_.get() + sizeof(IPCMessageShm));
response_batch_shm_ptr->has_error = false;
response_batch_shm_ptr->is_error_set = false;
}

execute_response = IPCMessage::Create(
reinterpret_cast<IPCMessageShm*>(response_batch.value().data_.get()),
response_batch.value().handle_);
execute_response->Args() =
response_batch.value().handle_ + sizeof(IPCMessageShm);
execute_response->InlineResponse() = false;
execute_response->Command() = PYTHONSTUB_ExecuteResponse;
_.Complete();
execute_finalize.Complete();
}

void
Stub::ProcessResponse(InferResponse* response)
{
response->SaveToSharedMemory(shm_pool_, false /* copy_gpu */);

for (auto& output_tensor : response->OutputTensors()) {
if (!output_tensor->IsCPU()) {
gpu_tensors_.push_back(output_tensor);
}
}
}

void
Stub::ProcessReturnedResponses(
py::list py_requests, py::object py_responses_obj)
py::list py_requests, py::object py_responses_obj,
std::optional<AllocatedSharedMemory<char>>& response_batch)
{
// Return if there is nothing to process.
if (py::isinstance<py::none>(py_responses_obj)) {
Expand Down Expand Up @@ -784,12 +831,55 @@ Stub::ProcessReturnedResponses(
"return list, found type '" +
std::string(py::str(py_responses[i].get_type())) + "'.");
}
std::shared_ptr<InferResponse> response =
py_responses[i].cast<std::shared_ptr<InferResponse>>();
request->GetResponseSender()->Send(
response, TRITONSERVER_RESPONSE_COMPLETE_FINAL);

InferResponse* response = py_responses[i].cast<InferResponse*>();
try {
request->GetResponseSender()->UpdateStateAndCounters(
response, TRITONSERVER_RESPONSE_COMPLETE_FINAL);
}
catch (const PythonBackendException& pb_exception) {
// Handle the exception here to catch the error when there's a response
// returned from `execute()`.
if (request->GetResponseSender()->IsClosed()) {
response_batch = std::move(shm_pool_->Construct<char>(
sizeof(ResponseBatch) + sizeof(IPCMessageShm)));
ResponseBatch* response_batch_shm_ptr =
reinterpret_cast<ResponseBatch*>(
response_batch.value().data_.get() + sizeof(IPCMessageShm));
response_batch_shm_ptr->batch_size = 0;
response_batch_shm_ptr->is_response_factory_deleted = true;
}
throw pb_exception;
}
}
}
// Return all the created responses using response_batch. The reason
// that both of the paths are available is that sending the responses
// using response_batch is faster than using `response_sender`.
response_batch = std::move(shm_pool_->Construct<char>(
sizeof(IPCMessageShm) +
requests_size * sizeof(bi::managed_external_buffer::handle_t) +
sizeof(ResponseBatch)));
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(
response_batch.value().data_.get() + sizeof(IPCMessageShm));

bi::managed_external_buffer::handle_t* responses_shm_handle =
reinterpret_cast<bi::managed_external_buffer::handle_t*>(
response_batch.value().data_.get() + sizeof(ResponseBatch) +
sizeof(IPCMessageShm));
for (size_t i = 0; i < responses_size; i++) {
// Check the return type of execute function.
InferRequest* infer_request = py_requests[i].cast<InferRequest*>();
InferResponse* infer_response = py_responses[i].cast<InferResponse*>();
if (!py::isinstance<py::none>(py_responses[i])) {
infer_response->PruneOutputTensors(infer_request->RequestedOutputNames());
ProcessResponse(infer_response);
responses_shm_handle[i] = infer_response->ShmHandle();
} else {
responses_shm_handle[i] = 0;
}
}
response_batch_shm_ptr->batch_size = requests_size;
}

py::object
Expand Down
5 changes: 4 additions & 1 deletion src/pb_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,10 @@ class Stub {
void ProcessRequests(RequestBatch* request_batch_shm_ptr);

void ProcessReturnedResponses(
py::list py_requests, py::object py_responses_obj);
py::list py_requests, py::object py_responses_obj,
std::optional<AllocatedSharedMemory<char>>& response_batch);

void ProcessResponse(InferResponse* response);

py::object GetAsyncEventLoop();

Expand Down
3 changes: 3 additions & 0 deletions src/pb_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ struct ResponseBatch : SendMessageBase {
bool is_error_set;

uint32_t response_size;

// Indicates whether the response factory has been deleted or not.
bool is_response_factory_deleted = false;
};

enum LogLevel { kInfo = 0, kWarning, kError, kVerbose };
Expand Down
Loading
Loading