From 28fdb7746b1034c70b52473dea7ec9557a748644 Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Wed, 8 Jan 2025 17:44:11 +0100 Subject: [PATCH 1/6] [C++ SDK] Remove session from session pool if stream session was closed by server side. --- ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp | 221 +++++++++++++++++- .../kqp_session_common/kqp_session_common.cpp | 25 +- .../kqp_session_common/kqp_session_common.h | 22 +- .../session_pool/session_pool.cpp | 32 +++ .../ydb_internal/session_pool/session_pool.h | 4 +- .../sdk/cpp/client/ydb_query/client.cpp | 2 +- .../client/ydb_query/impl/client_session.cpp | 19 +- .../client/ydb_query/impl/client_session.h | 10 +- 8 files changed, 320 insertions(+), 15 deletions(-) diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index db0476149f67..2bc2b5223c66 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -11,6 +11,8 @@ #include #include +#include +#include #include @@ -54,7 +56,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { WaitForZeroSessions(counters); } - Y_UNIT_TEST(QueryOnClosedSession) { + void DoClosedSessionRemovedWhileActioveTest(bool withQuery) { auto kikimr = DefaultKikimrRunner(); auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint()); NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters); @@ -75,10 +77,12 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { UNIT_ASSERT(allDoneOk); - auto execResult = session.ExecuteQuery("SELECT 1;", - NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + if (withQuery) { + auto execResult = session.ExecuteQuery("SELECT 1;", + NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(execResult.GetStatus(), EStatus::BAD_SESSION); + UNIT_ASSERT_VALUES_EQUAL(execResult.GetStatus(), EStatus::BAD_SESSION); + } } // closed session must be removed from session pool { @@ -90,6 +94,115 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { WaitForZeroSessions(counters); } + Y_UNIT_TEST(ClosedSessionRemovedWhileActiveWithQuery) { + // - Session is active (user gfot it) + // - server close it + // - user executes query (got BAD SESSION) + // - session should be removed from pool + DoClosedSessionRemovedWhileActioveTest(true); + } + +/* Not implemented in the sdk + Y_UNIT_TEST(ClosedSessionRemovedWhileActiveWithoutQuery) { + // - Session is active (user gfot it) + // - server close it + // - user do not executes any query + // - session should be removed from pool + DoClosedSessionRemovedWhileActioveTest(false); + } +*/ + // Copy paster from table service but with some modifications for query service + // Checks read iterators/session/sdk counters have expected values + Y_UNIT_TEST(CloseSessionsWithLoad) { + auto kikimr = std::make_shared(); + kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_DEBUG); + kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_SESSION, NLog::PRI_DEBUG); + kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_ACTOR, NLog::PRI_DEBUG); + kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_SERVICE, NLog::PRI_DEBUG); + + NYdb::NQuery::TQueryClient db = kikimr->GetQueryClient(); + + const ui32 SessionsCount = 50; + const TDuration WaitDuration = TDuration::Seconds(1); + + TVector sessions; + for (ui32 i = 0; i < SessionsCount; ++i) { + auto sessionResult = db.GetSession().GetValueSync(); + UNIT_ASSERT_C(sessionResult.IsSuccess(), sessionResult.GetIssues().ToString()); + + sessions.push_back(sessionResult.GetSession()); + } + + NPar::LocalExecutor().RunAdditionalThreads(SessionsCount + 1); + NPar::LocalExecutor().ExecRange([&kikimr, sessions, WaitDuration](int id) mutable { + if (id == (i32)sessions.size()) { + Sleep(WaitDuration); + Cerr << "start sessions close....." << Endl; + auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr->GetEndpoint()); + for (ui32 i = 0; i < sessions.size(); ++i) { + bool allDoneOk = true; + NTestHelpers::CheckDelete(clientConfig, sessions[i].GetId(), Ydb::StatusIds::SUCCESS, allDoneOk); + UNIT_ASSERT(allDoneOk); + } + + Cerr << "finished sessions close....." << Endl; + auto counters = GetServiceCounters(kikimr->GetTestServer().GetRuntime()->GetAppData(0).Counters, "ydb"); + + ui64 pendingCompilations = 0; + do { + Sleep(WaitDuration); + pendingCompilations = counters->GetNamedCounter("name", "table.query.compilation.active_count", false)->Val(); + Cerr << "still compiling... " << pendingCompilations << Endl; + } while (pendingCompilations != 0); + + ui64 pendingSessions = 0; + do { + Sleep(WaitDuration); + pendingSessions = counters->GetNamedCounter("name", "table.session.active_count", false)->Val(); + Cerr << "still active sessions ... " << pendingSessions << Endl; + } while (pendingSessions != 0); + + return; + } + + auto session = sessions[id]; + TMaybe tx; + + while (true) { + if (tx) { + auto result = tx->Commit().GetValueSync(); + if (!result.IsSuccess()) { + return; + } + + tx = {}; + continue; + } + + auto query = Sprintf(R"( + SELECT Key, Text, Data FROM `/Root/EightShard` WHERE Key=%1$d + 0; + SELECT Key, Data, Text FROM `/Root/EightShard` WHERE Key=%1$d + 1; + SELECT Text, Key, Data FROM `/Root/EightShard` WHERE Key=%1$d + 2; + SELECT Text, Data, Key FROM `/Root/EightShard` WHERE Key=%1$d + 3; + SELECT Data, Key, Text FROM `/Root/EightShard` WHERE Key=%1$d + 4; + SELECT Data, Text, Key FROM `/Root/EightShard` WHERE Key=%1$d + 5; + + UPSERT INTO `/Root/EightShard` (Key, Text) VALUES + (%2$dul, "New"); + )", RandomNumber(), RandomNumber()); + + auto result = session.ExecuteQuery(query, TTxControl::BeginTx()).GetValueSync(); + if (!result.IsSuccess()) { + Cerr << "received non-success status for session " << id << Endl; + return; + } + + tx = result.GetTransaction(); + } + }, 0, SessionsCount + 1, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY); + WaitForZeroReadIterators(kikimr->GetTestServer(), "/Root/EightShard"); + } + Y_UNIT_TEST(PeriodicTaskInSessionPool) { auto kikimr = DefaultKikimrRunner(); auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint()); @@ -169,6 +282,106 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { WaitForZeroSessions(counters); } + // Check closed session removed while its in the sessio pool + Y_UNIT_TEST(ClosedSessionRemovedFromPool) { + auto kikimr = DefaultKikimrRunner(); + auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint()); + NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters); + + { + auto db = kikimr.GetQueryClient(); + + Cerr << "START TEST...." << Endl; + TString id; + { + auto result = db.GetSession().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT(result.GetSession().GetId()); + auto session = result.GetSession(); + id = session.GetId(); + } + + bool allDoneOk = true; + NTestHelpers::CheckDelete(clientConfig, id, Ydb::StatusIds::SUCCESS, allDoneOk); + Cerr << "DELETED: " << id << Endl; + + Sleep(TDuration::Seconds(5)); + + UNIT_ASSERT(allDoneOk); + { + auto result = db.GetSession().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + auto newSession = result.GetSession(); + UNIT_ASSERT_C(newSession.GetId() != id, "closed id: " << id << " new id: " << newSession.GetId()); + + auto execResult = newSession.ExecuteQuery("SELECT 1;", + NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL(execResult.GetStatus(), EStatus::SUCCESS); + } + } + WaitForZeroSessions(counters); + } + + // Attempt to trigger simultanous server side close and return session + // From sdk perspective check no dataraces + Y_UNIT_TEST(ReturnAndCloseSameTime) { + auto kikimr = DefaultKikimrRunner(); + auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint()); + NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters); + + size_t iterations = 999; + auto db = kikimr.GetQueryClient(); + + NPar::LocalExecutor().RunAdditionalThreads(2); + while (iterations--) { + auto lim = iterations % 33; + TVector sessions; + TVector sids; + sessions.reserve(lim); + sids.reserve(lim); + for (size_t i = 0; i < lim; ++i) { + auto sessionResult = db.GetSession().GetValueSync(); + UNIT_ASSERT_C(sessionResult.IsSuccess(), sessionResult.GetIssues().ToString()); + + sessions.push_back(sessionResult.GetSession()); + sids.push_back(sessions.back().GetId()); + } + + if (iterations & 1) { + auto rng = std::default_random_engine {}; + std::ranges::shuffle(sids, rng); + } + + NPar::LocalExecutor().ExecRange([sessions{std::move(sessions)}, sids{std::move(sids)}, clientConfig](int id) mutable{ + if (id == 0) { + for (size_t i = 0; i < sessions.size(); i++) { + auto s = std::move(sessions[i]); + auto execResult = s.ExecuteQuery("SELECT 1;", + NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + switch (execResult.GetStatus()) { + case EStatus::SUCCESS: + case EStatus::BAD_SESSION: + break; + default: + UNIT_ASSERT_C(false, "unexpected status: " << execResult.GetStatus()); + } + } + } else if (id == 1) { + for (size_t i = 0; i < sids.size(); i++) { + bool allDoneOk = true; + NTestHelpers::CheckDelete(clientConfig, sids[i], Ydb::StatusIds::SUCCESS, allDoneOk); + UNIT_ASSERT(allDoneOk); + } + } else { + Y_ABORT_UNLESS(false, "unexpected thread cxount"); + } + }, 0, 2, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY); + } + + WaitForZeroSessions(counters); + } + Y_UNIT_TEST(StreamExecuteQueryPure) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetQueryClient(); diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp index 4cb5361cf0e8..c6014d853d3b 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp @@ -36,6 +36,7 @@ TKqpSessionCommon::TKqpSessionCommon( , State_(S_STANDALONE) , TimeToTouch_(TInstant::Now()) , TimeInPast_(TInstant::Now()) + , CloseHandler_(nullptr) , NeedUpdateActiveCounter_(false) {} @@ -115,7 +116,7 @@ void TKqpSessionCommon::ScheduleTimeToTouch(TDuration interval, if (updateTimeInPast) { TimeInPast_ = now; } - TimeToTouch_ = now + interval; + TimeToTouch_.store(now + interval, std::memory_order_relaxed); } } @@ -126,11 +127,11 @@ void TKqpSessionCommon::ScheduleTimeToTouchFast(TDuration interval, if (updateTimeInPast) { TimeInPast_ = now; } - TimeToTouch_ = now + interval; + TimeToTouch_.store(now + interval, std::memory_order_relaxed); } TInstant TKqpSessionCommon::GetTimeToTouchFast() const { - return TimeToTouch_; + return TimeToTouch_.load(std::memory_order_relaxed); } TInstant TKqpSessionCommon::GetTimeInPastFast() const { @@ -146,6 +147,24 @@ TDuration TKqpSessionCommon::GetTimeInterval() const { return TimeInterval_; } +void TKqpSessionCommon::UpdateServerCloseHandler(IServerCloseHandler* handler) { + CloseHandler_.store(handler, std::memory_order_relaxed); +} + +void TKqpSessionCommon::CloseFromServer(std::weak_ptr client) { + auto strong = client.lock(); + if (!strong) { + // Session closed on the server after stopping client - do nothing + // moreover pool maybe destoyed now + return; + } + + IServerCloseHandler* h = CloseHandler_.load(std::memory_order_relaxed); + if (h) { + h->OnCloseSession(this, strong); + } +} + //////////////////////////////////////////////////////////////////////////////// std::function TKqpSessionCommon::GetSmartDeleter( diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h index feec5fe3383c..454b6e8e4260 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h @@ -14,6 +14,15 @@ namespace NYdb { //////////////////////////////////////////////////////////////////////////////// ui64 GetNodeIdFromSession(const TStringType& sessionId); +class TKqpSessionCommon; + +class IServerCloseHandler { +public: + virtual ~IServerCloseHandler() = default; + // called when session should be closed by server signal + virtual void OnCloseSession(const TKqpSessionCommon*, std::shared_ptr) = 0; +}; + class TKqpSessionCommon : public TEndpointObj { public: TKqpSessionCommon(const TStringType& sessionId, const TStringType& endpoint, @@ -55,6 +64,12 @@ class TKqpSessionCommon : public TEndpointObj { static std::function GetSmartDeleter(std::shared_ptr client); + // Shoult be called under session pool lock + void UpdateServerCloseHandler(IServerCloseHandler*); + + // Called asynchronously from grpc thread. + void CloseFromServer(std::weak_ptr client); + protected: TAdaptiveLock Lock_; @@ -64,15 +79,20 @@ class TKqpSessionCommon : public TEndpointObj { const bool IsOwnedBySessionPool_; EState State_; - TInstant TimeToTouch_; + // This time is used during async close session handling which does not lock the session + // so we need to be able ro read this value atomicaly + std::atomic TimeToTouch_; TInstant TimeInPast_; // Is used to implement progressive timeout for settler keep alive call TDuration TimeInterval_; + + std::atomic CloseHandler_; // Indicate session was in active state, but state was changed // (need to decrement active session counter) // TODO: suboptimal because need lock for atomic change from interceptor // Rewrite with bit field bool NeedUpdateActiveCounter_; + }; } // namespace NYdb diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp index 92ff1929dcde..97b4db71e8f4 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp @@ -141,6 +141,7 @@ void TSessionPool::GetSession(std::unique_ptr ctx) } if (!Sessions_.empty()) { auto it = std::prev(Sessions_.end()); + it->second->UpdateServerCloseHandler(nullptr); sessionImpl = std::move(it->second); Sessions_.erase(it); } @@ -206,6 +207,7 @@ bool TSessionPool::ReturnSession(TKqpSessionCommon* impl, bool active) { if (!active) IncrementActiveCounterUnsafe(); } else { + impl->UpdateServerCloseHandler(this); Sessions_.emplace(std::make_pair( impl->GetTimeToTouchFast(), impl)); @@ -242,6 +244,7 @@ void TSessionPool::Drain(std::function&& std::lock_guard guard(Mtx_); Closed_ = close; for (auto it = Sessions_.begin(); it != Sessions_.end();) { + it->second->UpdateServerCloseHandler(nullptr); const bool cont = cb(std::move(it->second)); it = Sessions_.erase(it); if (!cont) @@ -282,6 +285,8 @@ TPeriodicCb TSessionPool::CreatePeriodicTask(std::weak_ptr weakC if (now < it->second->GetTimeToTouchFast()) break; + it->second->UpdateServerCloseHandler(nullptr); + if (deletePredicate(it->second.get(), sessions.size())) { sessionsToDelete.emplace_back(std::move(it->second)); sessions.erase(it++); @@ -338,6 +343,33 @@ i64 TSessionPool::GetCurrentPoolSize() const { return Sessions_.size(); } +void TSessionPool::OnCloseSession(const TKqpSessionCommon* s, std::shared_ptr client) { + + std::unique_ptr session; + { + std::lock_guard guard(Mtx_); + const auto timeToTouch = s->GetTimeToTouchFast(); + const auto id = s->GetId(); + auto it = Sessions_.find(timeToTouch); + // Sessions_ is multimap of sessions sorted by scheduled time to run periodic task + // Scan sessions with same scheduled time to find needed one. In most cases only one session here + while (it != Sessions_.end() && it->first == timeToTouch) { + if (id != it->second->GetId()) { + it++; + continue; + } + session = std::move(it->second); + Sessions_.erase(it); + break; + } + } + + if (session) { + Y_ABORT_UNLESS(session->GetState() == TKqpSessionCommon::S_IDLE); + CloseAndDeleteSession(std::move(session), client); + } +} + void TSessionPool::SetStatCollector(NSdkStats::TStatCollector::TSessionPoolStatCollector statCollector) { ActiveSessionsCounter_.Set(statCollector.ActiveSessions); InPoolSessionsCounter_.Set(statCollector.InPoolSessions); diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.h index 6e98e1ded7b7..bbd59ef2ea36 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.h +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.h @@ -83,7 +83,7 @@ NThreading::TFuture InjectSessionStatusInterception( return promise.GetFuture(); } -class TSessionPool { +class TSessionPool : public IServerCloseHandler { private: class TWaitersQueue { public: @@ -126,6 +126,8 @@ class TSessionPool { void Drain(std::function&&)> cb, bool close); void SetStatCollector(NSdkStats::TStatCollector::TSessionPoolStatCollector collector); + void OnCloseSession(const TKqpSessionCommon*, std::shared_ptr client) override; + private: void UpdateStats(); static void ReplySessionToUser(TKqpSessionCommon* session, std::unique_ptr ctx); diff --git a/ydb/public/sdk/cpp/client/ydb_query/client.cpp b/ydb/public/sdk/cpp/client/ydb_query/client.cpp index 2a5f47a69611..716cacdfc3b9 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/client.cpp +++ b/ydb/public/sdk/cpp/client/ydb_query/client.cpp @@ -341,7 +341,7 @@ class TQueryClient::TImpl: public TClientImplCommon, public const auto sessionId = resp->session_id(); request.set_session_id(sessionId); - auto args = std::make_shared(promise, sessionId, endpoint, client); + auto args = std::make_shared(promise, sessionId, endpoint, client, client); // Do not pass client timeout here. Session must be alive TRpcRequestSettings rpcSettings; diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp b/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp index c29ea636a716..cd9033369595 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp @@ -8,12 +8,27 @@ namespace NYdb::NQuery { -TSession::TImpl::TImpl(TStreamProcessorPtr ptr, const TString& sessionId, const TString& endpoint) +void TSession::TImpl::StartAsyncRead(TStreamProcessorPtr ptr, std::weak_ptr client) { + auto resp = std::make_shared(); + ptr->Read(resp.get(), [resp, ptr, this, client](NYdbGrpc::TGrpcStatus grpcStatus) mutable { + switch (grpcStatus.GRpcStatusCode) { + case grpc::StatusCode::OK: + StartAsyncRead(ptr, client); + break; + case grpc::StatusCode::OUT_OF_RANGE: + CloseFromServer(client); + break; + } + }); +} + +TSession::TImpl::TImpl(TStreamProcessorPtr ptr, const TString& sessionId, const TString& endpoint, std::weak_ptr client) : TKqpSessionCommon(sessionId, endpoint, true) , StreamProcessor_(ptr) { MarkActive(); SetNeedUpdateActiveCounter(true); + StartAsyncRead(StreamProcessor_, client); } TSession::TImpl::~TImpl() @@ -53,7 +68,7 @@ void TSession::TImpl::NewSmartShared(TStreamProcessorPtr ptr, std::move(st), TSession( args->Client, - new TSession::TImpl(ptr, args->SessionId, args->Endpoint) + new TSession::TImpl(ptr, args->SessionId, args->Endpoint, args->SessionClient) ) ) ); diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.h b/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.h index 48ed8fb839c8..e34e6440d05e 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.h +++ b/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.h @@ -13,28 +13,32 @@ class TSession::TImpl : public TKqpSessionCommon { TAttachSessionArgs(NThreading::TPromise promise, TString sessionId, TString endpoint, - std::shared_ptr client) + std::shared_ptr client, + std::weak_ptr sessionClient) : Promise(promise) , SessionId(sessionId) , Endpoint(endpoint) , Client(client) + , SessionClient(sessionClient) { } NThreading::TPromise Promise; TString SessionId; TString Endpoint; std::shared_ptr Client; + std::weak_ptr SessionClient; }; using TResponse = Ydb::Query::SessionState; using TStreamProcessorPtr = NYdbGrpc::IStreamRequestReadProcessor::TPtr; - TImpl(TStreamProcessorPtr ptr, const TString& id, const TString& endpoint); + TImpl(TStreamProcessorPtr ptr, const TString& id, const TString& endpoint, std::weak_ptr client); ~TImpl(); static void MakeImplAsync(TStreamProcessorPtr processor, std::shared_ptr args); - private: static void NewSmartShared(TStreamProcessorPtr ptr, std::shared_ptr args, NYdb::TStatus status); + void StartAsyncRead(TStreamProcessorPtr ptr, std::weak_ptr client); + private: TStreamProcessorPtr StreamProcessor_; }; From cc8d78db4969344bcccbf2c2604861aeb6464c9e Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Fri, 10 Jan 2025 12:46:43 +0100 Subject: [PATCH 2/6] Fix locking --- ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp | 11 ++- .../kqp_session_common/kqp_session_common.cpp | 6 +- .../kqp_session_common/kqp_session_common.h | 2 +- .../session_pool/session_pool.cpp | 4 +- .../client/ydb_query/impl/client_session.cpp | 79 +++++++++++++++++-- .../client/ydb_query/impl/client_session.h | 5 +- 6 files changed, 88 insertions(+), 19 deletions(-) diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index 2bc2b5223c66..bfe7dc582972 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -56,7 +56,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { WaitForZeroSessions(counters); } - void DoClosedSessionRemovedWhileActioveTest(bool withQuery) { + void DoClosedSessionRemovedWhileActiveTest(bool withQuery) { auto kikimr = DefaultKikimrRunner(); auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint()); NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters); @@ -90,6 +90,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); UNIT_ASSERT(result.GetSession().GetId() != id); } + UNIT_ASSERT_VALUES_EQUAL(db.GetActiveSessionCount(), 0); } WaitForZeroSessions(counters); } @@ -99,7 +100,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { // - server close it // - user executes query (got BAD SESSION) // - session should be removed from pool - DoClosedSessionRemovedWhileActioveTest(true); + DoClosedSessionRemovedWhileActiveTest(true); } /* Not implemented in the sdk @@ -108,7 +109,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { // - server close it // - user do not executes any query // - session should be removed from pool - DoClosedSessionRemovedWhileActioveTest(false); + DoClosedSessionRemovedWhileActiveTest(false); } */ // Copy paster from table service but with some modifications for query service @@ -282,7 +283,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { WaitForZeroSessions(counters); } - // Check closed session removed while its in the sessio pool + // Check closed session removed while its in the session pool Y_UNIT_TEST(ClosedSessionRemovedFromPool) { auto kikimr = DefaultKikimrRunner(); auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint()); @@ -291,7 +292,6 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { { auto db = kikimr.GetQueryClient(); - Cerr << "START TEST...." << Endl; TString id; { auto result = db.GetSession().GetValueSync(); @@ -303,7 +303,6 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { bool allDoneOk = true; NTestHelpers::CheckDelete(clientConfig, id, Ydb::StatusIds::SUCCESS, allDoneOk); - Cerr << "DELETED: " << id << Endl; Sleep(TDuration::Seconds(5)); diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp index c6014d853d3b..104b3056a189 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp @@ -148,10 +148,10 @@ TDuration TKqpSessionCommon::GetTimeInterval() const { } void TKqpSessionCommon::UpdateServerCloseHandler(IServerCloseHandler* handler) { - CloseHandler_.store(handler, std::memory_order_relaxed); + CloseHandler_.store(handler); } -void TKqpSessionCommon::CloseFromServer(std::weak_ptr client) { +void TKqpSessionCommon::CloseFromServer(std::weak_ptr client) noexcept { auto strong = client.lock(); if (!strong) { // Session closed on the server after stopping client - do nothing @@ -159,7 +159,7 @@ void TKqpSessionCommon::CloseFromServer(std::weak_ptr client) { return; } - IServerCloseHandler* h = CloseHandler_.load(std::memory_order_relaxed); + IServerCloseHandler* h = CloseHandler_.load(); if (h) { h->OnCloseSession(this, strong); } diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h index 454b6e8e4260..571ec23954d8 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h @@ -68,7 +68,7 @@ class TKqpSessionCommon : public TEndpointObj { void UpdateServerCloseHandler(IServerCloseHandler*); // Called asynchronously from grpc thread. - void CloseFromServer(std::weak_ptr client); + void CloseFromServer(std::weak_ptr client) noexcept; protected: TAdaptiveLock Lock_; diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp index 97b4db71e8f4..e9abd136fd99 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp @@ -285,12 +285,12 @@ TPeriodicCb TSessionPool::CreatePeriodicTask(std::weak_ptr weakC if (now < it->second->GetTimeToTouchFast()) break; - it->second->UpdateServerCloseHandler(nullptr); - if (deletePredicate(it->second.get(), sessions.size())) { + it->second->UpdateServerCloseHandler(nullptr); sessionsToDelete.emplace_back(std::move(it->second)); sessions.erase(it++); } else if (cmd) { + it->second->UpdateServerCloseHandler(nullptr); sessionsToTouch.emplace_back(std::move(it->second)); sessions.erase(it++); } else { diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp b/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp index cd9033369595..15512bd313c7 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp @@ -5,19 +5,84 @@ #undef INCLUDE_YDB_INTERNAL_H #include +#include namespace NYdb::NQuery { -void TSession::TImpl::StartAsyncRead(TStreamProcessorPtr ptr, std::weak_ptr client) { +// Custom lock primitive to protect session from destroying +// during async read execution. +// The problem is currect grpc stream reader has no method to get guarantee +// all callback executed and will not be executed until reader dtor called. +// So we need some way to protect access to row session impl pointer +// from async reader. We can't use shared/weak ptr here because TSessionImpl +// stores as uniq ptr inside session pool and as shared ptr in the TSession +// when user got session. + +// Why just not std::mutex? - Requirement do not destroy a mutex while it is locked +// makes it difficult to use here. + +// TODO: Proably we can add sync version of Cancell method in to grpc reader to make sure +// no more callback will be called. + +class TSafeTSessionImplHolder { + TSession::TImpl* Ptr; + std::atomic_uint32_t Semaphore; +public: + TSafeTSessionImplHolder(TSession::TImpl* p) + : Ptr(p) + , Semaphore(0) + {} + + TSession::TImpl* TrySharedOwning() noexcept { + auto old = Semaphore.fetch_add(1); + if (old == 0) { + return Ptr; + } else { + Y_ABORT_UNLESS(old == 1); + return nullptr; + } + } + + void Release() noexcept { + Semaphore.store(0); + } + + void WhaitAndLock() noexcept { + uint32_t cur; + uint32_t newVal = 1; + do { + cur = Semaphore.load(std::memory_order_relaxed); +#ifndef NDEBUG + if (cur > 1) { + Y_ABORT_UNLESS(false, "unexpected semaphore value"); + } +#endif + if (cur != 0) { + std::this_thread::yield(); + continue; + } + } while (!Semaphore.compare_exchange_weak(cur, newVal, + std::memory_order_release, std::memory_order_relaxed)); + } +}; + +void TSession::TImpl::StartAsyncRead(TStreamProcessorPtr ptr, std::weak_ptr client, + std::shared_ptr holder) +{ auto resp = std::make_shared(); - ptr->Read(resp.get(), [resp, ptr, this, client](NYdbGrpc::TGrpcStatus grpcStatus) mutable { + ptr->Read(resp.get(), [resp, ptr, client, holder](NYdbGrpc::TGrpcStatus grpcStatus) mutable { switch (grpcStatus.GRpcStatusCode) { case grpc::StatusCode::OK: - StartAsyncRead(ptr, client); + StartAsyncRead(ptr, client, holder); break; - case grpc::StatusCode::OUT_OF_RANGE: - CloseFromServer(client); + case grpc::StatusCode::OUT_OF_RANGE: { + auto impl = holder->TrySharedOwning(); + if (impl) { + impl->CloseFromServer(client); + } + holder->Release(); break; + } } }); } @@ -25,15 +90,17 @@ void TSession::TImpl::StartAsyncRead(TStreamProcessorPtr ptr, std::weak_ptr client) : TKqpSessionCommon(sessionId, endpoint, true) , StreamProcessor_(ptr) + , SessionHolder(std::make_shared(this)) { MarkActive(); SetNeedUpdateActiveCounter(true); - StartAsyncRead(StreamProcessor_, client); + StartAsyncRead(StreamProcessor_, client, SessionHolder); } TSession::TImpl::~TImpl() { StreamProcessor_->Cancel(); + SessionHolder->WhaitAndLock(); } void TSession::TImpl::MakeImplAsync(TStreamProcessorPtr ptr, diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.h b/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.h index e34e6440d05e..40d6379df2a8 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.h +++ b/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.h @@ -7,6 +7,8 @@ namespace NYdb::NQuery { +class TSafeTSessionImplHolder; + class TSession::TImpl : public TKqpSessionCommon { public: struct TAttachSessionArgs { @@ -37,10 +39,11 @@ class TSession::TImpl : public TKqpSessionCommon { private: static void NewSmartShared(TStreamProcessorPtr ptr, std::shared_ptr args, NYdb::TStatus status); - void StartAsyncRead(TStreamProcessorPtr ptr, std::weak_ptr client); + static void StartAsyncRead(TStreamProcessorPtr ptr, std::weak_ptr client, std::shared_ptr session); private: TStreamProcessorPtr StreamProcessor_; + std::shared_ptr SessionHolder; }; } From f96c8d4a7a449a664ce91d1bf4e6a7b4045b74a4 Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Fri, 10 Jan 2025 17:38:13 +0100 Subject: [PATCH 3/6] Fix locking --- ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp | 3 +- .../client/ydb_query/impl/client_session.cpp | 35 ++++++++++--------- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index bfe7dc582972..9fc4e299c80a 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -112,7 +112,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { DoClosedSessionRemovedWhileActiveTest(false); } */ - // Copy paster from table service but with some modifications for query service + // Copy paste from table service but with some modifications for query service // Checks read iterators/session/sdk counters have expected values Y_UNIT_TEST(CloseSessionsWithLoad) { auto kikimr = std::make_shared(); @@ -194,6 +194,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto result = session.ExecuteQuery(query, TTxControl::BeginTx()).GetValueSync(); if (!result.IsSuccess()) { + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_SESSION); Cerr << "received non-success status for session " << id << Endl; return; } diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp b/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp index 15512bd313c7..0f194fdbdbe5 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp @@ -21,12 +21,16 @@ namespace NYdb::NQuery { // Why just not std::mutex? - Requirement do not destroy a mutex while it is locked // makes it difficult to use here. -// TODO: Proably we can add sync version of Cancell method in to grpc reader to make sure +// Why thread id? - We destroy session from CloseFromServer call, so the session dtor called from thread +// which already got the lock. + +// TODO: Proably we can add sync version of Cancel method in to grpc reader to make sure // no more callback will be called. class TSafeTSessionImplHolder { TSession::TImpl* Ptr; std::atomic_uint32_t Semaphore; + std::atomic OwnerThread; public: TSafeTSessionImplHolder(TSession::TImpl* p) : Ptr(p) @@ -36,33 +40,30 @@ class TSafeTSessionImplHolder { TSession::TImpl* TrySharedOwning() noexcept { auto old = Semaphore.fetch_add(1); if (old == 0) { + OwnerThread.store(std::this_thread::get_id()); return Ptr; } else { - Y_ABORT_UNLESS(old == 1); return nullptr; } } void Release() noexcept { + OwnerThread.store(std::thread::id()); Semaphore.store(0); } - void WhaitAndLock() noexcept { - uint32_t cur; + void WaitAndLock() noexcept { + if (OwnerThread.load() == std::this_thread::get_id()) { + return; + } + + uint32_t cur = 0; uint32_t newVal = 1; - do { - cur = Semaphore.load(std::memory_order_relaxed); -#ifndef NDEBUG - if (cur > 1) { - Y_ABORT_UNLESS(false, "unexpected semaphore value"); - } -#endif - if (cur != 0) { + while (!Semaphore.compare_exchange_weak(cur, newVal, + std::memory_order_release, std::memory_order_relaxed)) { std::this_thread::yield(); - continue; - } - } while (!Semaphore.compare_exchange_weak(cur, newVal, - std::memory_order_release, std::memory_order_relaxed)); + cur = 0; + } } }; @@ -100,7 +101,7 @@ TSession::TImpl::TImpl(TStreamProcessorPtr ptr, const TString& sessionId, const TSession::TImpl::~TImpl() { StreamProcessor_->Cancel(); - SessionHolder->WhaitAndLock(); + SessionHolder->WaitAndLock(); } void TSession::TImpl::MakeImplAsync(TStreamProcessorPtr ptr, From e71aaf92ddd70d20008da32688206559f5ca65ac Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Fri, 10 Jan 2025 17:45:35 +0100 Subject: [PATCH 4/6] A bit more clean --- ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp b/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp index 0f194fdbdbe5..bd378cf2d7a3 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp @@ -80,8 +80,8 @@ void TSession::TImpl::StartAsyncRead(TStreamProcessorPtr ptr, std::weak_ptrTrySharedOwning(); if (impl) { impl->CloseFromServer(client); + holder->Release(); } - holder->Release(); break; } } From f76d8ef1bc93a1fe7e0fd99bc21037764ba9b6b9 Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Mon, 13 Jan 2025 12:07:13 +0100 Subject: [PATCH 5/6] Fix comment --- .../client/ydb_query/impl/client_session.cpp | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp b/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp index bd378cf2d7a3..807be98c2e48 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp @@ -11,20 +11,23 @@ namespace NYdb::NQuery { // Custom lock primitive to protect session from destroying // during async read execution. -// The problem is currect grpc stream reader has no method to get guarantee -// all callback executed and will not be executed until reader dtor called. +// The problem is TSession::TImpl holds grpc stream processor by IntrusivePtr +// and this processor alredy refcounted by internal code. +// That mean during TSession::TImpl dtor no gurantee to grpc procerrot will be destroyed. +// StreamProcessor_->Cancel() doesn't help it just start async cancelation but we have no way +// to wait cancelation has done. // So we need some way to protect access to row session impl pointer -// from async reader. We can't use shared/weak ptr here because TSessionImpl +// from async reader (processor callback). We can't use shared/weak ptr here because TSessionImpl // stores as uniq ptr inside session pool and as shared ptr in the TSession -// when user got session. +// when user got session (see GetSmartDeleter related code). // Why just not std::mutex? - Requirement do not destroy a mutex while it is locked -// makes it difficult to use here. +// makes it difficult to use here. Moreover we need to allow recursive lock. -// Why thread id? - We destroy session from CloseFromServer call, so the session dtor called from thread -// which already got the lock. +// Why recursive lock? - In happy path we destroy session from CloseFromServer call, +// so the session dtor called from thread which already got the lock. -// TODO: Proably we can add sync version of Cancel method in to grpc reader to make sure +// TODO: Proably we can add sync version of Cancel method in to grpc stream procesor to make sure // no more callback will be called. class TSafeTSessionImplHolder { From f4fbcba83ecb1f4fdc5150bf24d67dd30215616b Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Wed, 15 Jan 2025 13:46:08 +0100 Subject: [PATCH 6/6] Update ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h Co-authored-by: Bulat --- .../impl/ydb_internal/kqp_session_common/kqp_session_common.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h index 571ec23954d8..96f1e9c50983 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h @@ -80,7 +80,7 @@ class TKqpSessionCommon : public TEndpointObj { EState State_; // This time is used during async close session handling which does not lock the session - // so we need to be able ro read this value atomicaly + // so we need to be able to read this value atomicaly std::atomic TimeToTouch_; TInstant TimeInPast_; // Is used to implement progressive timeout for settler keep alive call