diff --git a/cpp/src/arrow/flight/test_definitions.cc b/cpp/src/arrow/flight/test_definitions.cc index ca6885428c0a0..026999f975a3f 100644 --- a/cpp/src/arrow/flight/test_definitions.cc +++ b/cpp/src/arrow/flight/test_definitions.cc @@ -1903,19 +1903,15 @@ void AsyncClientTest::TestListenerLifetime() { arrow::Future future_; }; - // Bad client code: don't retain a reference to the listener or even the client + // Bad client code: don't retain a reference to the listener { - ASSERT_OK_AND_ASSIGN(auto location, - Location::ForGrpcTcp("localhost", server_->port())); - ASSERT_OK_AND_ASSIGN(auto client, FlightClient::Connect(location)); - auto descr = FlightDescriptor::Command("my_command"); auto listener = std::make_shared(); listener->future_ = future; - client->GetFlightInfoAsync(descr, std::move(listener)); + client_->GetFlightInfoAsync(descr, std::move(listener)); } - ASSERT_OK(future.status()); + ASSERT_FINISHES_OK(future); } } // namespace flight diff --git a/cpp/src/arrow/flight/transport/grpc/grpc_client.cc b/cpp/src/arrow/flight/transport/grpc/grpc_client.cc index 08a79329b3f96..a7b6a358a6b85 100644 --- a/cpp/src/arrow/flight/transport/grpc/grpc_client.cc +++ b/cpp/src/arrow/flight/transport/grpc/grpc_client.cc @@ -599,11 +599,12 @@ class GrpcGarbageBin { void Dispose(std::unique_ptr trash) { std::unique_lock guard(grpc_destructor_mutex_); + if (!running_) return; garbage_bin_.push_back(std::move(trash)); grpc_destructor_cv_.notify_all(); } - ~GrpcGarbageBin() { + void Stop() { { std::unique_lock guard(grpc_destructor_mutex_); running_ = false; @@ -837,7 +838,11 @@ class GrpcClientImpl : public internal::ClientTransport { Status Close() override { #ifdef GRPC_ENABLE_ASYNC - garbage_bin_.reset(); + // XXX: if there are async RPCs running when the client is + // stopped, then when they go to use the garbage bin, they'll + // instead synchronously dispose of resources from the callback + // thread, and will likely crash. + garbage_bin_->Stop(); #endif // TODO(ARROW-15473): if we track ongoing RPCs, we can cancel them first // gRPC does not offer a real Close(). We could reset() the gRPC