From f3229908a211e80f79d1bec307cc0f486e447fab Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Wed, 15 Jan 2025 19:34:59 +0100 Subject: [PATCH] [C++ SDK] Remove session from session pool if stream session was closed by server side. (#13199) Co-authored-by: Bulat --- ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp | 221 +++++++++++++++++- .../kqp_session_common/kqp_session_common.cpp | 25 +- .../kqp_session_common/kqp_session_common.h | 22 +- .../session_pool/session_pool.cpp | 32 +++ .../ydb_internal/session_pool/session_pool.h | 4 +- .../sdk/cpp/client/ydb_query/client.cpp | 2 +- .../client/ydb_query/impl/client_session.cpp | 90 ++++++- .../client/ydb_query/impl/client_session.h | 13 +- 8 files changed, 394 insertions(+), 15 deletions(-) diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index db0476149f67..9fc4e299c80a 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -11,6 +11,8 @@ #include #include +#include +#include #include @@ -54,7 +56,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { WaitForZeroSessions(counters); } - Y_UNIT_TEST(QueryOnClosedSession) { + void DoClosedSessionRemovedWhileActiveTest(bool withQuery) { auto kikimr = DefaultKikimrRunner(); auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint()); NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters); @@ -75,10 +77,12 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { UNIT_ASSERT(allDoneOk); - auto execResult = session.ExecuteQuery("SELECT 1;", - NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + if (withQuery) { + auto execResult = session.ExecuteQuery("SELECT 1;", + NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(execResult.GetStatus(), EStatus::BAD_SESSION); + UNIT_ASSERT_VALUES_EQUAL(execResult.GetStatus(), EStatus::BAD_SESSION); + } } // closed session must be removed from session pool { @@ -86,10 +90,121 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); UNIT_ASSERT(result.GetSession().GetId() != id); } + UNIT_ASSERT_VALUES_EQUAL(db.GetActiveSessionCount(), 0); } WaitForZeroSessions(counters); } + Y_UNIT_TEST(ClosedSessionRemovedWhileActiveWithQuery) { + // - Session is active (user gfot it) + // - server close it + // - user executes query (got BAD SESSION) + // - session should be removed from pool + DoClosedSessionRemovedWhileActiveTest(true); + } + +/* Not implemented in the sdk + Y_UNIT_TEST(ClosedSessionRemovedWhileActiveWithoutQuery) { + // - Session is active (user gfot it) + // - server close it + // - user do not executes any query + // - session should be removed from pool + DoClosedSessionRemovedWhileActiveTest(false); + } +*/ + // Copy paste from table service but with some modifications for query service + // Checks read iterators/session/sdk counters have expected values + Y_UNIT_TEST(CloseSessionsWithLoad) { + auto kikimr = std::make_shared(); + kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_DEBUG); + kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_SESSION, NLog::PRI_DEBUG); + kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_ACTOR, NLog::PRI_DEBUG); + kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_SERVICE, NLog::PRI_DEBUG); + + NYdb::NQuery::TQueryClient db = kikimr->GetQueryClient(); + + const ui32 SessionsCount = 50; + const TDuration WaitDuration = TDuration::Seconds(1); + + TVector sessions; + for (ui32 i = 0; i < SessionsCount; ++i) { + auto sessionResult = db.GetSession().GetValueSync(); + UNIT_ASSERT_C(sessionResult.IsSuccess(), sessionResult.GetIssues().ToString()); + + sessions.push_back(sessionResult.GetSession()); + } + + NPar::LocalExecutor().RunAdditionalThreads(SessionsCount + 1); + NPar::LocalExecutor().ExecRange([&kikimr, sessions, WaitDuration](int id) mutable { + if (id == (i32)sessions.size()) { + Sleep(WaitDuration); + Cerr << "start sessions close....." << Endl; + auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr->GetEndpoint()); + for (ui32 i = 0; i < sessions.size(); ++i) { + bool allDoneOk = true; + NTestHelpers::CheckDelete(clientConfig, sessions[i].GetId(), Ydb::StatusIds::SUCCESS, allDoneOk); + UNIT_ASSERT(allDoneOk); + } + + Cerr << "finished sessions close....." << Endl; + auto counters = GetServiceCounters(kikimr->GetTestServer().GetRuntime()->GetAppData(0).Counters, "ydb"); + + ui64 pendingCompilations = 0; + do { + Sleep(WaitDuration); + pendingCompilations = counters->GetNamedCounter("name", "table.query.compilation.active_count", false)->Val(); + Cerr << "still compiling... " << pendingCompilations << Endl; + } while (pendingCompilations != 0); + + ui64 pendingSessions = 0; + do { + Sleep(WaitDuration); + pendingSessions = counters->GetNamedCounter("name", "table.session.active_count", false)->Val(); + Cerr << "still active sessions ... " << pendingSessions << Endl; + } while (pendingSessions != 0); + + return; + } + + auto session = sessions[id]; + TMaybe tx; + + while (true) { + if (tx) { + auto result = tx->Commit().GetValueSync(); + if (!result.IsSuccess()) { + return; + } + + tx = {}; + continue; + } + + auto query = Sprintf(R"( + SELECT Key, Text, Data FROM `/Root/EightShard` WHERE Key=%1$d + 0; + SELECT Key, Data, Text FROM `/Root/EightShard` WHERE Key=%1$d + 1; + SELECT Text, Key, Data FROM `/Root/EightShard` WHERE Key=%1$d + 2; + SELECT Text, Data, Key FROM `/Root/EightShard` WHERE Key=%1$d + 3; + SELECT Data, Key, Text FROM `/Root/EightShard` WHERE Key=%1$d + 4; + SELECT Data, Text, Key FROM `/Root/EightShard` WHERE Key=%1$d + 5; + + UPSERT INTO `/Root/EightShard` (Key, Text) VALUES + (%2$dul, "New"); + )", RandomNumber(), RandomNumber()); + + auto result = session.ExecuteQuery(query, TTxControl::BeginTx()).GetValueSync(); + if (!result.IsSuccess()) { + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_SESSION); + Cerr << "received non-success status for session " << id << Endl; + return; + } + + tx = result.GetTransaction(); + } + }, 0, SessionsCount + 1, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY); + WaitForZeroReadIterators(kikimr->GetTestServer(), "/Root/EightShard"); + } + Y_UNIT_TEST(PeriodicTaskInSessionPool) { auto kikimr = DefaultKikimrRunner(); auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint()); @@ -169,6 +284,104 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { WaitForZeroSessions(counters); } + // Check closed session removed while its in the session pool + Y_UNIT_TEST(ClosedSessionRemovedFromPool) { + auto kikimr = DefaultKikimrRunner(); + auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint()); + NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters); + + { + auto db = kikimr.GetQueryClient(); + + TString id; + { + auto result = db.GetSession().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT(result.GetSession().GetId()); + auto session = result.GetSession(); + id = session.GetId(); + } + + bool allDoneOk = true; + NTestHelpers::CheckDelete(clientConfig, id, Ydb::StatusIds::SUCCESS, allDoneOk); + + Sleep(TDuration::Seconds(5)); + + UNIT_ASSERT(allDoneOk); + { + auto result = db.GetSession().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + auto newSession = result.GetSession(); + UNIT_ASSERT_C(newSession.GetId() != id, "closed id: " << id << " new id: " << newSession.GetId()); + + auto execResult = newSession.ExecuteQuery("SELECT 1;", + NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL(execResult.GetStatus(), EStatus::SUCCESS); + } + } + WaitForZeroSessions(counters); + } + + // Attempt to trigger simultanous server side close and return session + // From sdk perspective check no dataraces + Y_UNIT_TEST(ReturnAndCloseSameTime) { + auto kikimr = DefaultKikimrRunner(); + auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint()); + NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters); + + size_t iterations = 999; + auto db = kikimr.GetQueryClient(); + + NPar::LocalExecutor().RunAdditionalThreads(2); + while (iterations--) { + auto lim = iterations % 33; + TVector sessions; + TVector sids; + sessions.reserve(lim); + sids.reserve(lim); + for (size_t i = 0; i < lim; ++i) { + auto sessionResult = db.GetSession().GetValueSync(); + UNIT_ASSERT_C(sessionResult.IsSuccess(), sessionResult.GetIssues().ToString()); + + sessions.push_back(sessionResult.GetSession()); + sids.push_back(sessions.back().GetId()); + } + + if (iterations & 1) { + auto rng = std::default_random_engine {}; + std::ranges::shuffle(sids, rng); + } + + NPar::LocalExecutor().ExecRange([sessions{std::move(sessions)}, sids{std::move(sids)}, clientConfig](int id) mutable{ + if (id == 0) { + for (size_t i = 0; i < sessions.size(); i++) { + auto s = std::move(sessions[i]); + auto execResult = s.ExecuteQuery("SELECT 1;", + NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + switch (execResult.GetStatus()) { + case EStatus::SUCCESS: + case EStatus::BAD_SESSION: + break; + default: + UNIT_ASSERT_C(false, "unexpected status: " << execResult.GetStatus()); + } + } + } else if (id == 1) { + for (size_t i = 0; i < sids.size(); i++) { + bool allDoneOk = true; + NTestHelpers::CheckDelete(clientConfig, sids[i], Ydb::StatusIds::SUCCESS, allDoneOk); + UNIT_ASSERT(allDoneOk); + } + } else { + Y_ABORT_UNLESS(false, "unexpected thread cxount"); + } + }, 0, 2, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY); + } + + WaitForZeroSessions(counters); + } + Y_UNIT_TEST(StreamExecuteQueryPure) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetQueryClient(); diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp index 4cb5361cf0e8..104b3056a189 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp @@ -36,6 +36,7 @@ TKqpSessionCommon::TKqpSessionCommon( , State_(S_STANDALONE) , TimeToTouch_(TInstant::Now()) , TimeInPast_(TInstant::Now()) + , CloseHandler_(nullptr) , NeedUpdateActiveCounter_(false) {} @@ -115,7 +116,7 @@ void TKqpSessionCommon::ScheduleTimeToTouch(TDuration interval, if (updateTimeInPast) { TimeInPast_ = now; } - TimeToTouch_ = now + interval; + TimeToTouch_.store(now + interval, std::memory_order_relaxed); } } @@ -126,11 +127,11 @@ void TKqpSessionCommon::ScheduleTimeToTouchFast(TDuration interval, if (updateTimeInPast) { TimeInPast_ = now; } - TimeToTouch_ = now + interval; + TimeToTouch_.store(now + interval, std::memory_order_relaxed); } TInstant TKqpSessionCommon::GetTimeToTouchFast() const { - return TimeToTouch_; + return TimeToTouch_.load(std::memory_order_relaxed); } TInstant TKqpSessionCommon::GetTimeInPastFast() const { @@ -146,6 +147,24 @@ TDuration TKqpSessionCommon::GetTimeInterval() const { return TimeInterval_; } +void TKqpSessionCommon::UpdateServerCloseHandler(IServerCloseHandler* handler) { + CloseHandler_.store(handler); +} + +void TKqpSessionCommon::CloseFromServer(std::weak_ptr client) noexcept { + auto strong = client.lock(); + if (!strong) { + // Session closed on the server after stopping client - do nothing + // moreover pool maybe destoyed now + return; + } + + IServerCloseHandler* h = CloseHandler_.load(); + if (h) { + h->OnCloseSession(this, strong); + } +} + //////////////////////////////////////////////////////////////////////////////// std::function TKqpSessionCommon::GetSmartDeleter( diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h index feec5fe3383c..96f1e9c50983 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h @@ -14,6 +14,15 @@ namespace NYdb { //////////////////////////////////////////////////////////////////////////////// ui64 GetNodeIdFromSession(const TStringType& sessionId); +class TKqpSessionCommon; + +class IServerCloseHandler { +public: + virtual ~IServerCloseHandler() = default; + // called when session should be closed by server signal + virtual void OnCloseSession(const TKqpSessionCommon*, std::shared_ptr) = 0; +}; + class TKqpSessionCommon : public TEndpointObj { public: TKqpSessionCommon(const TStringType& sessionId, const TStringType& endpoint, @@ -55,6 +64,12 @@ class TKqpSessionCommon : public TEndpointObj { static std::function GetSmartDeleter(std::shared_ptr client); + // Shoult be called under session pool lock + void UpdateServerCloseHandler(IServerCloseHandler*); + + // Called asynchronously from grpc thread. + void CloseFromServer(std::weak_ptr client) noexcept; + protected: TAdaptiveLock Lock_; @@ -64,15 +79,20 @@ class TKqpSessionCommon : public TEndpointObj { const bool IsOwnedBySessionPool_; EState State_; - TInstant TimeToTouch_; + // This time is used during async close session handling which does not lock the session + // so we need to be able to read this value atomicaly + std::atomic TimeToTouch_; TInstant TimeInPast_; // Is used to implement progressive timeout for settler keep alive call TDuration TimeInterval_; + + std::atomic CloseHandler_; // Indicate session was in active state, but state was changed // (need to decrement active session counter) // TODO: suboptimal because need lock for atomic change from interceptor // Rewrite with bit field bool NeedUpdateActiveCounter_; + }; } // namespace NYdb diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp index 92ff1929dcde..e9abd136fd99 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp @@ -141,6 +141,7 @@ void TSessionPool::GetSession(std::unique_ptr ctx) } if (!Sessions_.empty()) { auto it = std::prev(Sessions_.end()); + it->second->UpdateServerCloseHandler(nullptr); sessionImpl = std::move(it->second); Sessions_.erase(it); } @@ -206,6 +207,7 @@ bool TSessionPool::ReturnSession(TKqpSessionCommon* impl, bool active) { if (!active) IncrementActiveCounterUnsafe(); } else { + impl->UpdateServerCloseHandler(this); Sessions_.emplace(std::make_pair( impl->GetTimeToTouchFast(), impl)); @@ -242,6 +244,7 @@ void TSessionPool::Drain(std::function&& std::lock_guard guard(Mtx_); Closed_ = close; for (auto it = Sessions_.begin(); it != Sessions_.end();) { + it->second->UpdateServerCloseHandler(nullptr); const bool cont = cb(std::move(it->second)); it = Sessions_.erase(it); if (!cont) @@ -283,9 +286,11 @@ TPeriodicCb TSessionPool::CreatePeriodicTask(std::weak_ptr weakC break; if (deletePredicate(it->second.get(), sessions.size())) { + it->second->UpdateServerCloseHandler(nullptr); sessionsToDelete.emplace_back(std::move(it->second)); sessions.erase(it++); } else if (cmd) { + it->second->UpdateServerCloseHandler(nullptr); sessionsToTouch.emplace_back(std::move(it->second)); sessions.erase(it++); } else { @@ -338,6 +343,33 @@ i64 TSessionPool::GetCurrentPoolSize() const { return Sessions_.size(); } +void TSessionPool::OnCloseSession(const TKqpSessionCommon* s, std::shared_ptr client) { + + std::unique_ptr session; + { + std::lock_guard guard(Mtx_); + const auto timeToTouch = s->GetTimeToTouchFast(); + const auto id = s->GetId(); + auto it = Sessions_.find(timeToTouch); + // Sessions_ is multimap of sessions sorted by scheduled time to run periodic task + // Scan sessions with same scheduled time to find needed one. In most cases only one session here + while (it != Sessions_.end() && it->first == timeToTouch) { + if (id != it->second->GetId()) { + it++; + continue; + } + session = std::move(it->second); + Sessions_.erase(it); + break; + } + } + + if (session) { + Y_ABORT_UNLESS(session->GetState() == TKqpSessionCommon::S_IDLE); + CloseAndDeleteSession(std::move(session), client); + } +} + void TSessionPool::SetStatCollector(NSdkStats::TStatCollector::TSessionPoolStatCollector statCollector) { ActiveSessionsCounter_.Set(statCollector.ActiveSessions); InPoolSessionsCounter_.Set(statCollector.InPoolSessions); diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.h index 6e98e1ded7b7..bbd59ef2ea36 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.h +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.h @@ -83,7 +83,7 @@ NThreading::TFuture InjectSessionStatusInterception( return promise.GetFuture(); } -class TSessionPool { +class TSessionPool : public IServerCloseHandler { private: class TWaitersQueue { public: @@ -126,6 +126,8 @@ class TSessionPool { void Drain(std::function&&)> cb, bool close); void SetStatCollector(NSdkStats::TStatCollector::TSessionPoolStatCollector collector); + void OnCloseSession(const TKqpSessionCommon*, std::shared_ptr client) override; + private: void UpdateStats(); static void ReplySessionToUser(TKqpSessionCommon* session, std::unique_ptr ctx); diff --git a/ydb/public/sdk/cpp/client/ydb_query/client.cpp b/ydb/public/sdk/cpp/client/ydb_query/client.cpp index 2a5f47a69611..716cacdfc3b9 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/client.cpp +++ b/ydb/public/sdk/cpp/client/ydb_query/client.cpp @@ -341,7 +341,7 @@ class TQueryClient::TImpl: public TClientImplCommon, public const auto sessionId = resp->session_id(); request.set_session_id(sessionId); - auto args = std::make_shared(promise, sessionId, endpoint, client); + auto args = std::make_shared(promise, sessionId, endpoint, client, client); // Do not pass client timeout here. Session must be alive TRpcRequestSettings rpcSettings; diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp b/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp index c29ea636a716..807be98c2e48 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp @@ -5,20 +5,106 @@ #undef INCLUDE_YDB_INTERNAL_H #include +#include namespace NYdb::NQuery { -TSession::TImpl::TImpl(TStreamProcessorPtr ptr, const TString& sessionId, const TString& endpoint) +// Custom lock primitive to protect session from destroying +// during async read execution. +// The problem is TSession::TImpl holds grpc stream processor by IntrusivePtr +// and this processor alredy refcounted by internal code. +// That mean during TSession::TImpl dtor no gurantee to grpc procerrot will be destroyed. +// StreamProcessor_->Cancel() doesn't help it just start async cancelation but we have no way +// to wait cancelation has done. +// So we need some way to protect access to row session impl pointer +// from async reader (processor callback). We can't use shared/weak ptr here because TSessionImpl +// stores as uniq ptr inside session pool and as shared ptr in the TSession +// when user got session (see GetSmartDeleter related code). + +// Why just not std::mutex? - Requirement do not destroy a mutex while it is locked +// makes it difficult to use here. Moreover we need to allow recursive lock. + +// Why recursive lock? - In happy path we destroy session from CloseFromServer call, +// so the session dtor called from thread which already got the lock. + +// TODO: Proably we can add sync version of Cancel method in to grpc stream procesor to make sure +// no more callback will be called. + +class TSafeTSessionImplHolder { + TSession::TImpl* Ptr; + std::atomic_uint32_t Semaphore; + std::atomic OwnerThread; +public: + TSafeTSessionImplHolder(TSession::TImpl* p) + : Ptr(p) + , Semaphore(0) + {} + + TSession::TImpl* TrySharedOwning() noexcept { + auto old = Semaphore.fetch_add(1); + if (old == 0) { + OwnerThread.store(std::this_thread::get_id()); + return Ptr; + } else { + return nullptr; + } + } + + void Release() noexcept { + OwnerThread.store(std::thread::id()); + Semaphore.store(0); + } + + void WaitAndLock() noexcept { + if (OwnerThread.load() == std::this_thread::get_id()) { + return; + } + + uint32_t cur = 0; + uint32_t newVal = 1; + while (!Semaphore.compare_exchange_weak(cur, newVal, + std::memory_order_release, std::memory_order_relaxed)) { + std::this_thread::yield(); + cur = 0; + } + } +}; + +void TSession::TImpl::StartAsyncRead(TStreamProcessorPtr ptr, std::weak_ptr client, + std::shared_ptr holder) +{ + auto resp = std::make_shared(); + ptr->Read(resp.get(), [resp, ptr, client, holder](NYdbGrpc::TGrpcStatus grpcStatus) mutable { + switch (grpcStatus.GRpcStatusCode) { + case grpc::StatusCode::OK: + StartAsyncRead(ptr, client, holder); + break; + case grpc::StatusCode::OUT_OF_RANGE: { + auto impl = holder->TrySharedOwning(); + if (impl) { + impl->CloseFromServer(client); + holder->Release(); + } + break; + } + } + }); +} + +TSession::TImpl::TImpl(TStreamProcessorPtr ptr, const TString& sessionId, const TString& endpoint, std::weak_ptr client) : TKqpSessionCommon(sessionId, endpoint, true) , StreamProcessor_(ptr) + , SessionHolder(std::make_shared(this)) { MarkActive(); SetNeedUpdateActiveCounter(true); + StartAsyncRead(StreamProcessor_, client, SessionHolder); } TSession::TImpl::~TImpl() { StreamProcessor_->Cancel(); + SessionHolder->WaitAndLock(); } void TSession::TImpl::MakeImplAsync(TStreamProcessorPtr ptr, @@ -53,7 +139,7 @@ void TSession::TImpl::NewSmartShared(TStreamProcessorPtr ptr, std::move(st), TSession( args->Client, - new TSession::TImpl(ptr, args->SessionId, args->Endpoint) + new TSession::TImpl(ptr, args->SessionId, args->Endpoint, args->SessionClient) ) ) ); diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.h b/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.h index 48ed8fb839c8..40d6379df2a8 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.h +++ b/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.h @@ -7,36 +7,43 @@ namespace NYdb::NQuery { +class TSafeTSessionImplHolder; + class TSession::TImpl : public TKqpSessionCommon { public: struct TAttachSessionArgs { TAttachSessionArgs(NThreading::TPromise promise, TString sessionId, TString endpoint, - std::shared_ptr client) + std::shared_ptr client, + std::weak_ptr sessionClient) : Promise(promise) , SessionId(sessionId) , Endpoint(endpoint) , Client(client) + , SessionClient(sessionClient) { } NThreading::TPromise Promise; TString SessionId; TString Endpoint; std::shared_ptr Client; + std::weak_ptr SessionClient; }; using TResponse = Ydb::Query::SessionState; using TStreamProcessorPtr = NYdbGrpc::IStreamRequestReadProcessor::TPtr; - TImpl(TStreamProcessorPtr ptr, const TString& id, const TString& endpoint); + TImpl(TStreamProcessorPtr ptr, const TString& id, const TString& endpoint, std::weak_ptr client); ~TImpl(); static void MakeImplAsync(TStreamProcessorPtr processor, std::shared_ptr args); - private: static void NewSmartShared(TStreamProcessorPtr ptr, std::shared_ptr args, NYdb::TStatus status); + static void StartAsyncRead(TStreamProcessorPtr ptr, std::weak_ptr client, std::shared_ptr session); + private: TStreamProcessorPtr StreamProcessor_; + std::shared_ptr SessionHolder; }; }