Skip to content

Commit

Permalink
Add back 24.05 response sending path to fix performance (#381)
Browse files Browse the repository at this point in the history
* Add back 24.05 response sender path

* Improve perf

* Fix cleanup

* Review comments

* Fix up

* Fix up

* Fix response factory cleanup

* Fix segfault

* Fix error handling

* Remove extra logs

* Fix up, add comments

* Address comment

* Fix up

---------

Co-authored-by: Iman Tabrizian <[email protected]>
  • Loading branch information
krishung5 and Tabrizian committed Oct 9, 2024
1 parent 35a1c1f commit 4221e84
Show file tree
Hide file tree
Showing 11 changed files with 579 additions and 97 deletions.
2 changes: 1 addition & 1 deletion src/infer_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ InferRequest::Exec(const bool is_decoupled)
{
bi::scoped_lock<bi::interprocess_mutex> lock{
*(ipc_message->ResponseMutex())};
stub->SendIPCMessage(ipc_message);
stub->SendIPCUtilsMessage(ipc_message);
ipc_message->ResponseCondition()->wait(lock);
}

Expand Down
1 change: 1 addition & 0 deletions src/infer_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class InferRequest {
InferenceTrace& GetTrace();
uint32_t ReleaseFlags();
void SetReleaseFlags(const uint32_t& flags);
intptr_t GetResponseFactoryAddress() { return response_factory_address_; }

#ifdef TRITON_PB_STUB
std::shared_ptr<InferResponse> Exec(const bool is_decoupled);
Expand Down
23 changes: 23 additions & 0 deletions src/ipc_message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,21 @@ IPCMessage::Create(
new IPCMessage(ipc_message_shm, response_mutex_shm, response_cond_shm));
}

std::unique_ptr<IPCMessage>
IPCMessage::Create(
IPCMessageShm* ipc_message_shm,
bi::managed_external_buffer::handle_t& message_handle)
{
return std::unique_ptr<IPCMessage>(
new IPCMessage(ipc_message_shm, message_handle));
}

AllocatedSharedMemory<IPCMessageShm>&
IPCMessage::GetAllocatedSharedMemory()
{
return ipc_message_shm_;
}

std::unique_ptr<IPCMessage>
IPCMessage::LoadFromSharedMemory(
std::unique_ptr<SharedMemoryManager>& shm_pool,
Expand Down Expand Up @@ -133,4 +148,12 @@ IPCMessage::IPCMessage(
ipc_message_handle_ = ipc_message_shm_.handle_;
}

IPCMessage::IPCMessage(
IPCMessageShm* ipc_message_shm,
bi::managed_external_buffer::handle_t& handle)
{
ipc_message_handle_ = handle;
ipc_message_shm_ptr_ = ipc_message_shm;
}

}}}; // namespace triton::backend::python
9 changes: 9 additions & 0 deletions src/ipc_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ class IPCMessage {
static std::unique_ptr<IPCMessage> Create(
const std::unique_ptr<SharedMemoryManager>& shm_pool,
bool inline_response);

static std::unique_ptr<IPCMessage> Create(
IPCMessageShm* ipc_message_shm,
bi::managed_external_buffer::handle_t& message_handle);
static std::unique_ptr<IPCMessage> LoadFromSharedMemory(
std::unique_ptr<SharedMemoryManager>& shm_pool,
bi::managed_external_buffer::handle_t message_handle);
Expand All @@ -108,6 +112,7 @@ class IPCMessage {
bi::interprocess_mutex* ResponseMutex();
bi::managed_external_buffer::handle_t& Args();
bi::managed_external_buffer::handle_t ShmHandle();
AllocatedSharedMemory<IPCMessageShm>& GetAllocatedSharedMemory();

private:
AllocatedSharedMemory<IPCMessageShm> ipc_message_shm_;
Expand All @@ -129,6 +134,10 @@ class IPCMessage {
AllocatedSharedMemory<IPCMessageShm>& ipc_message_shm,
AllocatedSharedMemory<bi::interprocess_mutex>& response_mutex_shm,
AllocatedSharedMemory<bi::interprocess_condition>& response_cond_shm);

IPCMessage(
IPCMessageShm* ipc_message_shm,
bi::managed_external_buffer::handle_t& handle);
};

}}}; // namespace triton::backend::python
146 changes: 119 additions & 27 deletions src/pb_stub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -653,27 +653,20 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
{
py::list py_request_list =
LoadRequestsFromSharedMemory(request_batch_shm_ptr);
std::unique_ptr<IPCMessage> execute_response =
IPCMessage::Create(shm_pool_, false /* Inline response */);
execute_response->Command() = PYTHONSTUB_ExecuteResponse;
std::unique_ptr<IPCMessage> execute_response;

AllocatedSharedMemory<ResponseBatch> response_batch =
shm_pool_->Construct<ResponseBatch>();
ResponseBatch* response_batch_shm_ptr =
reinterpret_cast<ResponseBatch*>(response_batch.data_.get());
execute_response->Args() = response_batch.handle_;
std::optional<AllocatedSharedMemory<char>> response_batch;
bool has_exception = false;
std::string error_string;
std::unique_ptr<PbString> error_string_shm;
std::string err_message;

ScopedDefer execute_finalize([this] { stub_message_queue_->Pop(); });
ScopedDefer _(
[this, &execute_response] { SendIPCMessage(execute_response); });

py::object execute_return;
py::object coroutine_return;
try {
response_batch_shm_ptr->has_error = false;
response_batch_shm_ptr->is_error_set = false;

if (!py::hasattr(model_instance_, "execute")) {
std::string message = "Python model " + model_context_.PythonModelPath() +
" does not implement `execute` method.";
Expand All @@ -683,8 +676,7 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
{
NVTX_RANGE(nvtx_, "PyExecute " + name_);

py::object execute_return =
model_instance_.attr("execute")(py_request_list);
execute_return = model_instance_.attr("execute")(py_request_list);

bool is_coroutine = py::module::import("asyncio")
.attr("iscoroutine")(execute_return)
Expand All @@ -694,12 +686,14 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
// Do not wait for async decoupled execute to return.
RunCoroutine(execute_return, true /* in_background */);
} else {
py::object coroutine_return =
coroutine_return =
RunCoroutine(execute_return, false /* in_background */);
ProcessReturnedResponses(py_request_list, coroutine_return);
ProcessReturnedResponses(
py_request_list, coroutine_return, response_batch);
}
} else {
ProcessReturnedResponses(py_request_list, execute_return);
ProcessReturnedResponses(
py_request_list, execute_return, response_batch);
}
}
}
Expand All @@ -713,16 +707,36 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
}

if (has_exception) {
std::string err_message =
std::string(
"Failed to process the request(s) for model '" + name_ +
"', message: ") +
error_string;
err_message = std::string(
"Failed to process the request(s) for model '" + name_ +
"', message: ") +
error_string;
LOG_ERROR << err_message.c_str();
if (!response_batch) {
response_batch = shm_pool_->Construct<char>(
sizeof(ResponseBatch) + sizeof(IPCMessageShm));
}
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(
response_batch.value().data_.get() + sizeof(IPCMessageShm));

// The backend will clean up the response factory if there is an error in
// the response batch. For decoupled mode, it is necessary to handle cases
// where the response sender should have already cleaned up, ensuring the
// backend does not delete the response factory again during error handling.
if (IsDecoupled()) {
for (py::handle py_request : py_request_list) {
InferRequest* request = py_request.cast<InferRequest*>();
if (request->GetResponseSender()->IsClosed()) {
response_batch_shm_ptr->is_response_factory_deleted = true;
}
}
}

response_batch_shm_ptr->has_error = true;
error_string_shm = PbString::Create(shm_pool_, err_message);
response_batch_shm_ptr->error = error_string_shm->ShmHandle();
response_batch_shm_ptr->is_error_set = true;
response_batch_shm_ptr->batch_size = 0;
// Once the error is sent to the backend, the backend is supposed to close
// all response factories if not already closed, so closing all response
// senders if not already closed to prevent the model from sending more
Expand All @@ -731,12 +745,47 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
InferRequest* request = py_request.cast<InferRequest*>();
request->GetResponseSender()->Close();
}
} else {
if (!response_batch) {
response_batch = shm_pool_->Construct<char>(
sizeof(ResponseBatch) + sizeof(IPCMessageShm));
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(
response_batch.value().data_.get() + sizeof(IPCMessageShm));
response_batch_shm_ptr->batch_size = 0;
}
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(
response_batch.value().data_.get() + sizeof(IPCMessageShm));
response_batch_shm_ptr->has_error = false;
response_batch_shm_ptr->is_error_set = false;
}

execute_response = IPCMessage::Create(
reinterpret_cast<IPCMessageShm*>(response_batch.value().data_.get()),
response_batch.value().handle_);
execute_response->Args() =
response_batch.value().handle_ + sizeof(IPCMessageShm);
execute_response->InlineResponse() = false;
execute_response->Command() = PYTHONSTUB_ExecuteResponse;
_.Complete();
execute_finalize.Complete();
}

void
Stub::ProcessResponse(InferResponse* response)
{
response->SaveToSharedMemory(shm_pool_, false /* copy_gpu */);

for (auto& output_tensor : response->OutputTensors()) {
if (!output_tensor->IsCPU()) {
gpu_tensors_.push_back(output_tensor);
}
}
}

void
Stub::ProcessReturnedResponses(
py::list py_requests, py::object py_responses_obj)
py::list py_requests, py::object py_responses_obj,
std::optional<AllocatedSharedMemory<char>>& response_batch)
{
// Return if there is nothing to process.
if (py::isinstance<py::none>(py_responses_obj)) {
Expand Down Expand Up @@ -784,12 +833,55 @@ Stub::ProcessReturnedResponses(
"return list, found type '" +
std::string(py::str(py_responses[i].get_type())) + "'.");
}
std::shared_ptr<InferResponse> response =
py_responses[i].cast<std::shared_ptr<InferResponse>>();
request->GetResponseSender()->Send(
response, TRITONSERVER_RESPONSE_COMPLETE_FINAL);

InferResponse* response = py_responses[i].cast<InferResponse*>();
try {
request->GetResponseSender()->UpdateStateAndCounters(
response, TRITONSERVER_RESPONSE_COMPLETE_FINAL);
}
catch (const PythonBackendException& pb_exception) {
// Handle the exception here to catch the error when there's a response
// returned from `execute()`.
if (request->GetResponseSender()->IsClosed()) {
response_batch = std::move(shm_pool_->Construct<char>(
sizeof(ResponseBatch) + sizeof(IPCMessageShm)));
ResponseBatch* response_batch_shm_ptr =
reinterpret_cast<ResponseBatch*>(
response_batch.value().data_.get() + sizeof(IPCMessageShm));
response_batch_shm_ptr->batch_size = 0;
response_batch_shm_ptr->is_response_factory_deleted = true;
}
throw pb_exception;
}
}
}
// Return all the created responses using response_batch. The reason
// that both of the paths are available is that sending the responses
// using response_batch is faster than using `response_sender`.
response_batch = std::move(shm_pool_->Construct<char>(
sizeof(IPCMessageShm) +
requests_size * sizeof(bi::managed_external_buffer::handle_t) +
sizeof(ResponseBatch)));
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(
response_batch.value().data_.get() + sizeof(IPCMessageShm));

bi::managed_external_buffer::handle_t* responses_shm_handle =
reinterpret_cast<bi::managed_external_buffer::handle_t*>(
response_batch.value().data_.get() + sizeof(ResponseBatch) +
sizeof(IPCMessageShm));
for (size_t i = 0; i < responses_size; i++) {
// Check the return type of execute function.
InferRequest* infer_request = py_requests[i].cast<InferRequest*>();
InferResponse* infer_response = py_responses[i].cast<InferResponse*>();
if (!py::isinstance<py::none>(py_responses[i])) {
infer_response->PruneOutputTensors(infer_request->RequestedOutputNames());
ProcessResponse(infer_response);
responses_shm_handle[i] = infer_response->ShmHandle();
} else {
responses_shm_handle[i] = 0;
}
}
response_batch_shm_ptr->batch_size = requests_size;
}

py::object
Expand Down
5 changes: 4 additions & 1 deletion src/pb_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,10 @@ class Stub {
void ProcessRequests(RequestBatch* request_batch_shm_ptr);

void ProcessReturnedResponses(
py::list py_requests, py::object py_responses_obj);
py::list py_requests, py::object py_responses_obj,
std::optional<AllocatedSharedMemory<char>>& response_batch);

void ProcessResponse(InferResponse* response);

py::object GetAsyncEventLoop();

Expand Down
3 changes: 3 additions & 0 deletions src/pb_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ struct ResponseBatch : SendMessageBase {
bool is_error_set;

uint32_t response_size;

// Indicates whether the response factory has been deleted or not.
bool is_response_factory_deleted = false;
};

enum LogLevel { kInfo = 0, kWarning, kError, kVerbose };
Expand Down
Loading

0 comments on commit 4221e84

Please sign in to comment.