Description
Это копия задачи [YDBREQUESTS-1561)
Далее текст и комментрии просто скопированы из задачи.
Я использую с++ sdk в async стиле
static constexpr auto query = R(
DECLARE $shard_key AS Uint64;
DECLARE $yandexid AS String;
SELECT device_id
FROM device_subscriptions
WHERE shard_key = $shard_key
AND yandexid = $yandexid
AND subscribed = false;
)";
auto params = session.GetParamsBuilder()
.AddParam("$shard_key")
.Uint64(LegacyComputeShardKey(puid))
.Build()
.AddParam("$yandexid")
.String(puid)
.Build()
.Build();
return session.ExecuteDataQuery(
query,
NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::OnlineRO()).CommitTx(),
std::move(params),
ExecDataQuerySettings_
).Apply([resultSet](const NYdb::NTable::TAsyncDataQueryResult& fut) mutable -> NYdb::TStatus {
const auto& res = fut.GetValueSync();
if (res.IsSuccess()) {
*resultSet = res.GetResultSet(0);
}
return res;
});
}, RetryOperationSettings_).Apply([resultSet, operationContext = std::move(operationContext)](const NYdb::TAsyncStatus& fut) mutable -> TExpected<TVector<TString>, TString> {
const auto& status = fut.GetValueSync();
if (const auto opRes = operationContext.ReportResult(status); !opRes) {
return opRes.Error();
}
NYdb::TResultSetParser parser(resultSet->GetRef());
TVector<TString> result;
result.reserve(parser.RowsCount());
while (parser.TryNextRow()) {
result.emplace_back(*parser.ColumnParser("device_id").GetOptionalString());
}
return result;
});
И в таком коде у меня много цепочек, т.е. вполне может быть
[...](...) {
// ...
return AnotherYdbRequest().Apply(
// ...
);
}
)
Т.е. Я умею делать долгие цепочки Apply’ев с верой что ydb всегда выставит promise.
В настройках драйвера я выставляю такие опции:
auto driverConfig = NYdb::TDriverConfig()
.SetEndpoint(config.GetAddress())
.SetDatabase(config.GetDBName())
.SetAuthToken(GetEnv("YDB_TOKEN"))
.SetBalancingPolicy(NYdb::EBalancingPolicy::UsePreferableLocation)
.SetNetworkThreadsNum(config.GetNetworkThreads())
.SetDiscoveryMode(NYdb::EDiscoveryMode::Async);
Т.е. я не выставяю опции SetClientThreadsNum и SetMaxClientQueueSize из-за которых может случится deadlock (у меня используется adaptive pool).
В настройках table клиента я выставляю такие опции:
, Client_(
driver,
NYdb::NTable::TClientSettings()
.UseQueryCache(false)
.SessionPoolSettings(
NYdb::NTable::TSessionPoolSettings()
.MaxActiveSessions(config.GetMaxActiveSessions())
)
)
, RetryOperationSettings_(NYdb::NTable::TRetryOperationSettings().MaxRetries(config.GetMaxRetries()))
, ExecDataQuerySettings_(
NYdb::NTable::TExecDataQuerySettings()
.OperationTimeout(FromString<TDuration>(config.GetOperationTimeout()))
.ClientTimeout(FromString<TDuration>(config.GetClientTimeout()))
.CancelAfter(FromString<TDuration>(config.GetCancelAfter()))
.KeepInQueryCache(true)
)
(Именно благодаря опции CancelAfter я верю что значение future с запросом будет выставлено)
Если это важно - у меня поверх одного драйвера может быть 2+ table клиента.
И примерно при таком setup’е я получил deadlock
Все мои thread’ы заблокированы в ожидании Add adaptive pool’а:
0 0x00007fec79b27ad3 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/x86_64-linux-gnu/libpthread.so.0
#1 0x00000000025e3e34 in TCondVar::TImpl::WaitD (this=0x474c3fc9673c, lock=..., deadLine=...) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/util/system/condvar.cpp:99
#2 TCondVar::WaitD (this=0x474c3fb000f0, mutex=..., deadLine=...) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/util/system/condvar.cpp:144
#3 TCondVar::WaitI (this=0x474c3fb000f0, m=...) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/util/system/condvar.h:50
#4 TCondVar::Wait (this=0x474c3fb000f0, m=...) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/util/system/condvar.h:60
#5 TAdaptiveThreadPool::TImpl::Add (this=0x474c3fb000b0, obj=0x474c3c54d200) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/util/thread/pool.cpp:448
#6 0x00000000025e3b5a in TAdaptiveThreadPool::Add (this=<optimized out>, obj=0x80) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/util/thread/pool.cpp:570
#7 0x0000000003ae46e9 in NYdb::TGRpcConnectionsImpl::EnqueueResponse (this=0x474c3f80c180, action=0x474c3c54d200) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp:421
#8 NYdb::TGRpcConnectionsImpl::ScheduleOneTimeTask(std::__y1::function<void ()>&&, TDuration)::$_1::operator()(NYql::TIssues&&, NYdb::EStatus) (this=0x7fec6dd3fcb0, status=<optimized out>) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp:229
#9 0x0000000003ae4445 in NYdb::TGRpcConnectionsImpl::ScheduleOneTimeTask(std::__y1::function<void ()>&&, TDuration) (this=0x474c3f80c180, fn=..., timeout=...) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp:238
#10 0x0000000003bb6480 in NYdb::NTable::TSessionPoolImpl::CreateFakeSession (promise=..., client=...) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/ydb/public/sdk/cpp/client/ydb_table/table.cpp:2869
#11 0x0000000003bb6cc3 in NYdb::NTable::TSessionPoolImpl::GetSession (this=0x474c3fbc34c0, client=warning: RTTI symbol not found for class 'std::__y1::__shared_ptr_pointer<NYdb::NTable::TTableClient::TImpl*, std::__y1::shared_ptr<NYdb::NTable::TTableClient::TImpl>::__shared_ptr_default_delete<NYdb::NTable::TTableClient::TImpl, NYdb::NTable::TTableClient::TImpl>, std::__y1::allocator<NYdb::NTable::TTableClient::TImpl> >'
warning: RTTI symbol not found for class 'std::__y1::__shared_ptr_pointer<NYdb::NTable::TTableClient::TImpl*, std::__y1::shared_ptr<NYdb::NTable::TTableClient::TImpl>::__shared_ptr_default_delete<NYdb::NTable::TTableClient::TImpl, NYdb::NTable::TTableClient::TImpl>, std::__y1::allocator<NYdb::NTable::TTableClient::TImpl> >'
std::__y1::shared_ptr (count 3717, weak 5) = 0x474c3fbc33c0, settings=..., sessionProvider=0x3bb5140 <NYdb::NTable::TTableClient::TImpl::SettlerAwareSessonProvider(std::__y1::shared_ptr<NYdb::NTable::TTableClient::TImpl>, NYdb::NTable::TCreateSessionSettings const&)>) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/ydb/public/sdk/cpp/client/ydb_table/table.cpp:2905
#12 0x0000000003bb95df in NYdb::NTable::TTableClient::TImpl::GetSession (this=<optimized out>, settings=...) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/ydb/public/sdk/cpp/client/ydb_table/table.cpp:1854
#13 0x0000000003be4158 in NYdb::NTable::TTableClient::GetSession (this=0x474c0acf6748, settings=...) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/ydb/public/sdk/cpp/client/ydb_table/table.cpp:3132
#14 NYdb::NTable::TRetryOperationWithSession::Execute (this=0x474c0acf66f0) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/ydb/public/sdk/cpp/client/ydb_table/table.cpp:3290
#15 0x0000000003bb98a6 in NYdb::NTable::TTableClient::RetryOperation(std::__y1::function<NThreading::TFuture<NYdb::TStatus> (NYdb::NTable::TSession)>&&, NYdb::NTable::TRetryOperationSettings const&) (this=<optimized out>, operation=..., settings=...) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/ydb/public/sdk/cpp/client/ydb_table/table.cpp:3329
// table client call from here
#16 0x0000000003b89a50 in NMatrix::NNotificator::TConnectionsStorage::UpdateConnectionsWithFullStateRemoveAll (this=<optimized out>, endpoint=..., logContext=..., metrics=...) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/alice/matrix/notificator/library/storages/connections/storage.cpp:320
// my code stack
Треды ydb, кажется, тоже все заблокированы на add в этот thread pool:
Thread 14 (Thread 0x7fec72d4b700 (LWP 158)):
#0 0x00007fec79b27ad3 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/x86_64-linux-gnu/libpthread.so.0
#1 0x00000000025e3e34 in TCondVar::TImpl::WaitD (this=0x474c3fc9673c, lock=..., deadLine=...) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/util/system/condvar.cpp:99
#2 TCondVar::WaitD (this=0x474c3fb000f0, mutex=..., deadLine=...) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/util/system/condvar.cpp:144
#3 TCondVar::WaitI (this=0x474c3fb000f0, m=...) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/util/system/condvar.h:50
#4 TCondVar::Wait (this=0x474c3fb000f0, m=...) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/util/system/condvar.h:60
#5 TAdaptiveThreadPool::TImpl::Add (this=0x474c3fb000b0, obj=0x474c37206040) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/util/thread/pool.cpp:448
#6 0x00000000025e3b5a in TAdaptiveThreadPool::Add (this=<optimized out>, obj=0x80) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/util/thread/pool.cpp:570
#7 0x0000000003cb6ca7 in NYdb::TGRpcConnectionsImpl::EnqueueResponse (this=0x474c3f80c180, action=0x474c37206040) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp:421
#8 NYdb::TGRpcConnectionsImpl::Run<Ydb::Table::V1::TableService, Ydb::Table::ExecuteDataQueryRequest, Ydb::Table::ExecuteDataQueryResponse>(Ydb::Table::ExecuteDataQueryRequest&&, std::__y1::function<void (Ydb::Table::ExecuteDataQueryResponse*, NYdb::TPlainStatus)>&&, NGrpc::TSimpleRequestProcessor<Ydb::Table::V1::TableService::Stub, Ydb::Table::ExecuteDataQueryRequest, Ydb::Table::ExecuteDataQueryResponse>::TAsyncRequest, std::__y1::shared_ptr<NYdb::TDbDriverState>, NYdb::TRpcRequestSettings const&, TDuration, NYdb::TEndpointKey const&, std::__y1::shared_ptr<NGrpc::IQueueClientContext>)::{lambda(NYdb::TPlainStatus, std::__y1::unique_ptr<NGrpc::TServiceConnection<Ydb::Table::V1::TableService>, std::__y1::default_delete<NGrpc::TServiceConnection<Ydb::Table::V1::TableService> > >, NYdb::TEndpointKey)#1}::operator()(NYdb::TPlainStatus, std::__y1::unique_ptr<NGrpc::TServiceConnection<Ydb::Table::V1::TableService>, std::__y1::default_delete<NGrpc::TServiceConnection<Ydb::Table::V1::TableService> > >, NYdb::TEndpointKey)::{lambda(grpc::ClientContext const&, NGrpc::TGrpcStatus&&, Ydb::Table::ExecuteDataQueryResponse&&)#1}::operator()(grpc::ClientContext const, NGrpc::TGrpcStatus, NGrpc::TGrpcStatus&&) (this=<optimized out>, ctx=..., grpcStatus=..., response=...) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h:250
#9 0x0000000003cb876c in std::__y1::__function::__value_func<void (grpc::ClientContext const&, NGrpc::TGrpcStatus&&, Ydb::Table::ExecuteDataQueryResponse&&)>::operator()(grpc::ClientContext const&, NGrpc::TGrpcStatus&&, Ydb::Table::ExecuteDataQueryResponse&&) const (this=0x474c3fbf5d30, __args=..., __args=..., __args=...) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/contrib/libs/cxxsupp/libcxx/include/__functional/function.h:508
#10 std::__y1::function<void (grpc::ClientContext const&, NGrpc::TGrpcStatus&&, Ydb::Table::ExecuteDataQueryResponse&&)>::operator()(grpc::ClientContext const&, NGrpc::TGrpcStatus&&, Ydb::Table::ExecuteDataQueryResponse&&) const (this=0x474c3fbf5d30, __arg=..., __arg=..., __arg=...) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/contrib/libs/cxxsupp/libcxx/include/__functional/function.h:1192
#11 NGrpc::TAdvancedRequestProcessor<Ydb::Table::V1::TableService::Stub, Ydb::Table::ExecuteDataQueryRequest, Ydb::Table::ExecuteDataQueryResponse>::Execute (this=0x474c3fbf5b80, ok=true) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/library/cpp/grpc/client/grpc_client_low.h:344
#12 0x0000000003b36a18 in NGrpc::PullEvents (cq=0x474c3fc07a80) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/library/cpp/grpc/client/grpc_client_low.cpp:194
#13 NGrpc::TGRpcClientLow::Init(unsigned long)::$_2::operator()() const (this=<optimized out>) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/library/cpp/grpc/client/grpc_client_low.cpp:430
#14 std::__y1::__invoke<NGrpc::TGRpcClientLow::Init(unsigned long)::$_2&>(NGrpc::TGRpcClientLow::Init(unsigned long)::$_2&) (__f=...) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/contrib/libs/cxxsupp/libcxx/include/type_traits:3671
#15 std::__y1::__invoke_void_return_wrapper<void, true>::__call<NGrpc::TGRpcClientLow::Init(unsigned long)::$_2&>(NGrpc::TGRpcClientLow::Init(unsigned long)::$_2&) (__args=...) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/contrib/libs/cxxsupp/libcxx/include/__functional/invoke.h:61
#16 std::__y1::__function::__alloc_func<NGrpc::TGRpcClientLow::Init(unsigned long)::$_2, std::__y1::allocator<NGrpc::TGRpcClientLow::Init(unsigned long)::$_2>, void ()>::operator()() (this=<optimized out>) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/contrib/libs/cxxsupp/libcxx/include/__functional/function.h:181
#17 std::__y1::__function::__func<NGrpc::TGRpcClientLow::Init(unsigned long)::$_2, std::__y1::allocator<NGrpc::TGRpcClientLow::Init(unsigned long)::$_2>, void ()>::operator()() (this=<optimized out>) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/contrib/libs/cxxsupp/libcxx/include/__functional/function.h:355
#18 0x00000000025e5546 in std::__y1::__function::__value_func<void ()>::operator()() const (this=0x474c3fc0c4d0) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/contrib/libs/cxxsupp/libcxx/include/__functional/function.h:508
#19 std::__y1::function<void ()>::operator()() const (this=0x474c3fc0c4d0) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/contrib/libs/cxxsupp/libcxx/include/__functional/function.h:1192
#20 (anonymous namespace)::TThreadFactoryFuncObj::DoExecute (this=0x474c3fc0c4c0) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/util/thread/factory.cpp:61
#21 0x00000000025e5967 in IThreadFactory::IThreadAble::Execute (this=0x474c3fc9673c) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/util/thread/factory.h:15
#22 (anonymous namespace)::TSystemThreadFactory::TPoolThread::ThreadProc (func=0x474c3fc9673c) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/util/thread/factory.cpp:36
#23 0x00000000022149a6 in (anonymous namespace)::TPosixThread::ThreadProxy (arg=0x474c3fc950c0) at /place/sandbox-data/tasks/7/0/1382850307/__FUSE/mount_path_ccf18fa7-3c1f-4017-a724-9e910c2ec9b1/util/system/thread.cpp:229
#24 0x00007fec79b216db in start_thread () from /lib/x86_64-linux-gnu/libpthread.so.0
#25 0x00007fec7984a71f in clone () from /lib/x86_64-linux-gnu/libc.so.6
(Но тут я проглядел бегло, честно скажу, мог что-то пропустить)
К сожалению сглупил и не оставил core файл (есть только дамп бектрейса всех тредов).
Можете подсказать что я делаю не так и как можно избежать этого deadlock’а?