Skip to content

Commit

Permalink
Update garbage bin
Browse files Browse the repository at this point in the history
  • Loading branch information
lidavidm committed Jul 31, 2023
1 parent a97ec93 commit dc0772b
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 9 deletions.
10 changes: 3 additions & 7 deletions cpp/src/arrow/flight/test_definitions.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1903,19 +1903,15 @@ void AsyncClientTest::TestListenerLifetime() {
arrow::Future<FlightInfo> future_;
};

// Bad client code: don't retain a reference to the listener or even the client
// Bad client code: don't retain a reference to the listener
{
ASSERT_OK_AND_ASSIGN(auto location,
Location::ForGrpcTcp("localhost", server_->port()));
ASSERT_OK_AND_ASSIGN(auto client, FlightClient::Connect(location));

auto descr = FlightDescriptor::Command("my_command");
auto listener = std::make_shared<Listener>();
listener->future_ = future;
client->GetFlightInfoAsync(descr, std::move(listener));
client_->GetFlightInfoAsync(descr, std::move(listener));
}

ASSERT_OK(future.status());
ASSERT_FINISHES_OK(future);
}

} // namespace flight
Expand Down
9 changes: 7 additions & 2 deletions cpp/src/arrow/flight/transport/grpc/grpc_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -599,11 +599,12 @@ class GrpcGarbageBin {

void Dispose(std::unique_ptr<internal::AsyncRpc> trash) {
std::unique_lock<std::mutex> guard(grpc_destructor_mutex_);
if (!running_) return;
garbage_bin_.push_back(std::move(trash));
grpc_destructor_cv_.notify_all();
}

~GrpcGarbageBin() {
void Stop() {
{
std::unique_lock<std::mutex> guard(grpc_destructor_mutex_);
running_ = false;
Expand Down Expand Up @@ -837,7 +838,11 @@ class GrpcClientImpl : public internal::ClientTransport {

Status Close() override {
#ifdef GRPC_ENABLE_ASYNC
garbage_bin_.reset();
// XXX: if there are async RPCs running when the client is
// stopped, then when they go to use the garbage bin, they'll
// instead synchronously dispose of resources from the callback
// thread, and will likely crash.
garbage_bin_->Stop();
#endif
// TODO(ARROW-15473): if we track ongoing RPCs, we can cancel them first
// gRPC does not offer a real Close(). We could reset() the gRPC
Expand Down

0 comments on commit dc0772b

Please sign in to comment.