Skip to content

Commit

Permalink
[C++ SDK] Remove session from session pool if stream session was clos…
Browse files Browse the repository at this point in the history
…ed by server side. (#13199)

Co-authored-by: Bulat <[email protected]>
  • Loading branch information
dcherednik and Gazizonoki authored Jan 15, 2025
1 parent ef3837c commit f322990
Show file tree
Hide file tree
Showing 8 changed files with 394 additions and 15 deletions.
221 changes: 217 additions & 4 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include <ydb/public/sdk/cpp/client/ydb_types/operation/operation.h>

#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/core/base/counters.h>
#include <library/cpp/threading/local_executor/local_executor.h>

#include <fmt/format.h>

Expand Down Expand Up @@ -54,7 +56,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
WaitForZeroSessions(counters);
}

Y_UNIT_TEST(QueryOnClosedSession) {
void DoClosedSessionRemovedWhileActiveTest(bool withQuery) {
auto kikimr = DefaultKikimrRunner();
auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint());
NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
Expand All @@ -75,21 +77,134 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {

UNIT_ASSERT(allDoneOk);

auto execResult = session.ExecuteQuery("SELECT 1;",
NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
if (withQuery) {
auto execResult = session.ExecuteQuery("SELECT 1;",
NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();

UNIT_ASSERT_VALUES_EQUAL(execResult.GetStatus(), EStatus::BAD_SESSION);
UNIT_ASSERT_VALUES_EQUAL(execResult.GetStatus(), EStatus::BAD_SESSION);
}
}
// closed session must be removed from session pool
{
auto result = db.GetSession().GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT(result.GetSession().GetId() != id);
}
UNIT_ASSERT_VALUES_EQUAL(db.GetActiveSessionCount(), 0);
}
WaitForZeroSessions(counters);
}

Y_UNIT_TEST(ClosedSessionRemovedWhileActiveWithQuery) {
// - Session is active (user gfot it)
// - server close it
// - user executes query (got BAD SESSION)
// - session should be removed from pool
DoClosedSessionRemovedWhileActiveTest(true);
}

/* Not implemented in the sdk
Y_UNIT_TEST(ClosedSessionRemovedWhileActiveWithoutQuery) {
// - Session is active (user gfot it)
// - server close it
// - user do not executes any query
// - session should be removed from pool
DoClosedSessionRemovedWhileActiveTest(false);
}
*/
// Copy paste from table service but with some modifications for query service
// Checks read iterators/session/sdk counters have expected values
Y_UNIT_TEST(CloseSessionsWithLoad) {
auto kikimr = std::make_shared<TKikimrRunner>();
kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_DEBUG);
kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_SESSION, NLog::PRI_DEBUG);
kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_ACTOR, NLog::PRI_DEBUG);
kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_SERVICE, NLog::PRI_DEBUG);

NYdb::NQuery::TQueryClient db = kikimr->GetQueryClient();

const ui32 SessionsCount = 50;
const TDuration WaitDuration = TDuration::Seconds(1);

TVector<NYdb::NQuery::TQueryClient::TSession> sessions;
for (ui32 i = 0; i < SessionsCount; ++i) {
auto sessionResult = db.GetSession().GetValueSync();
UNIT_ASSERT_C(sessionResult.IsSuccess(), sessionResult.GetIssues().ToString());

sessions.push_back(sessionResult.GetSession());
}

NPar::LocalExecutor().RunAdditionalThreads(SessionsCount + 1);
NPar::LocalExecutor().ExecRange([&kikimr, sessions, WaitDuration](int id) mutable {
if (id == (i32)sessions.size()) {
Sleep(WaitDuration);
Cerr << "start sessions close....." << Endl;
auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr->GetEndpoint());
for (ui32 i = 0; i < sessions.size(); ++i) {
bool allDoneOk = true;
NTestHelpers::CheckDelete(clientConfig, sessions[i].GetId(), Ydb::StatusIds::SUCCESS, allDoneOk);
UNIT_ASSERT(allDoneOk);
}

Cerr << "finished sessions close....." << Endl;
auto counters = GetServiceCounters(kikimr->GetTestServer().GetRuntime()->GetAppData(0).Counters, "ydb");

ui64 pendingCompilations = 0;
do {
Sleep(WaitDuration);
pendingCompilations = counters->GetNamedCounter("name", "table.query.compilation.active_count", false)->Val();
Cerr << "still compiling... " << pendingCompilations << Endl;
} while (pendingCompilations != 0);

ui64 pendingSessions = 0;
do {
Sleep(WaitDuration);
pendingSessions = counters->GetNamedCounter("name", "table.session.active_count", false)->Val();
Cerr << "still active sessions ... " << pendingSessions << Endl;
} while (pendingSessions != 0);

return;
}

auto session = sessions[id];
TMaybe<TTransaction> tx;

while (true) {
if (tx) {
auto result = tx->Commit().GetValueSync();
if (!result.IsSuccess()) {
return;
}

tx = {};
continue;
}

auto query = Sprintf(R"(
SELECT Key, Text, Data FROM `/Root/EightShard` WHERE Key=%1$d + 0;
SELECT Key, Data, Text FROM `/Root/EightShard` WHERE Key=%1$d + 1;
SELECT Text, Key, Data FROM `/Root/EightShard` WHERE Key=%1$d + 2;
SELECT Text, Data, Key FROM `/Root/EightShard` WHERE Key=%1$d + 3;
SELECT Data, Key, Text FROM `/Root/EightShard` WHERE Key=%1$d + 4;
SELECT Data, Text, Key FROM `/Root/EightShard` WHERE Key=%1$d + 5;
UPSERT INTO `/Root/EightShard` (Key, Text) VALUES
(%2$dul, "New");
)", RandomNumber<ui32>(), RandomNumber<ui32>());

auto result = session.ExecuteQuery(query, TTxControl::BeginTx()).GetValueSync();
if (!result.IsSuccess()) {
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_SESSION);
Cerr << "received non-success status for session " << id << Endl;
return;
}

tx = result.GetTransaction();
}
}, 0, SessionsCount + 1, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY);
WaitForZeroReadIterators(kikimr->GetTestServer(), "/Root/EightShard");
}

Y_UNIT_TEST(PeriodicTaskInSessionPool) {
auto kikimr = DefaultKikimrRunner();
auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint());
Expand Down Expand Up @@ -169,6 +284,104 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
WaitForZeroSessions(counters);
}

// Check closed session removed while its in the session pool
Y_UNIT_TEST(ClosedSessionRemovedFromPool) {
auto kikimr = DefaultKikimrRunner();
auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint());
NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);

{
auto db = kikimr.GetQueryClient();

TString id;
{
auto result = db.GetSession().GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT(result.GetSession().GetId());
auto session = result.GetSession();
id = session.GetId();
}

bool allDoneOk = true;
NTestHelpers::CheckDelete(clientConfig, id, Ydb::StatusIds::SUCCESS, allDoneOk);

Sleep(TDuration::Seconds(5));

UNIT_ASSERT(allDoneOk);
{
auto result = db.GetSession().GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
auto newSession = result.GetSession();
UNIT_ASSERT_C(newSession.GetId() != id, "closed id: " << id << " new id: " << newSession.GetId());

auto execResult = newSession.ExecuteQuery("SELECT 1;",
NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();

UNIT_ASSERT_VALUES_EQUAL(execResult.GetStatus(), EStatus::SUCCESS);
}
}
WaitForZeroSessions(counters);
}

// Attempt to trigger simultanous server side close and return session
// From sdk perspective check no dataraces
Y_UNIT_TEST(ReturnAndCloseSameTime) {
auto kikimr = DefaultKikimrRunner();
auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint());
NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);

size_t iterations = 999;
auto db = kikimr.GetQueryClient();

NPar::LocalExecutor().RunAdditionalThreads(2);
while (iterations--) {
auto lim = iterations % 33;
TVector<NYdb::NQuery::TQueryClient::TSession> sessions;
TVector<TString> sids;
sessions.reserve(lim);
sids.reserve(lim);
for (size_t i = 0; i < lim; ++i) {
auto sessionResult = db.GetSession().GetValueSync();
UNIT_ASSERT_C(sessionResult.IsSuccess(), sessionResult.GetIssues().ToString());

sessions.push_back(sessionResult.GetSession());
sids.push_back(sessions.back().GetId());
}

if (iterations & 1) {
auto rng = std::default_random_engine {};
std::ranges::shuffle(sids, rng);
}

NPar::LocalExecutor().ExecRange([sessions{std::move(sessions)}, sids{std::move(sids)}, clientConfig](int id) mutable{
if (id == 0) {
for (size_t i = 0; i < sessions.size(); i++) {
auto s = std::move(sessions[i]);
auto execResult = s.ExecuteQuery("SELECT 1;",
NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
switch (execResult.GetStatus()) {
case EStatus::SUCCESS:
case EStatus::BAD_SESSION:
break;
default:
UNIT_ASSERT_C(false, "unexpected status: " << execResult.GetStatus());
}
}
} else if (id == 1) {
for (size_t i = 0; i < sids.size(); i++) {
bool allDoneOk = true;
NTestHelpers::CheckDelete(clientConfig, sids[i], Ydb::StatusIds::SUCCESS, allDoneOk);
UNIT_ASSERT(allDoneOk);
}
} else {
Y_ABORT_UNLESS(false, "unexpected thread cxount");
}
}, 0, 2, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY);
}

WaitForZeroSessions(counters);
}

Y_UNIT_TEST(StreamExecuteQueryPure) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetQueryClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ TKqpSessionCommon::TKqpSessionCommon(
, State_(S_STANDALONE)
, TimeToTouch_(TInstant::Now())
, TimeInPast_(TInstant::Now())
, CloseHandler_(nullptr)
, NeedUpdateActiveCounter_(false)
{}

Expand Down Expand Up @@ -115,7 +116,7 @@ void TKqpSessionCommon::ScheduleTimeToTouch(TDuration interval,
if (updateTimeInPast) {
TimeInPast_ = now;
}
TimeToTouch_ = now + interval;
TimeToTouch_.store(now + interval, std::memory_order_relaxed);
}
}

Expand All @@ -126,11 +127,11 @@ void TKqpSessionCommon::ScheduleTimeToTouchFast(TDuration interval,
if (updateTimeInPast) {
TimeInPast_ = now;
}
TimeToTouch_ = now + interval;
TimeToTouch_.store(now + interval, std::memory_order_relaxed);
}

TInstant TKqpSessionCommon::GetTimeToTouchFast() const {
return TimeToTouch_;
return TimeToTouch_.load(std::memory_order_relaxed);
}

TInstant TKqpSessionCommon::GetTimeInPastFast() const {
Expand All @@ -146,6 +147,24 @@ TDuration TKqpSessionCommon::GetTimeInterval() const {
return TimeInterval_;
}

void TKqpSessionCommon::UpdateServerCloseHandler(IServerCloseHandler* handler) {
CloseHandler_.store(handler);
}

void TKqpSessionCommon::CloseFromServer(std::weak_ptr<ISessionClient> client) noexcept {
auto strong = client.lock();
if (!strong) {
// Session closed on the server after stopping client - do nothing
// moreover pool maybe destoyed now
return;
}

IServerCloseHandler* h = CloseHandler_.load();
if (h) {
h->OnCloseSession(this, strong);
}
}

////////////////////////////////////////////////////////////////////////////////

std::function<void(TKqpSessionCommon*)> TKqpSessionCommon::GetSmartDeleter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@ namespace NYdb {
////////////////////////////////////////////////////////////////////////////////
ui64 GetNodeIdFromSession(const TStringType& sessionId);

class TKqpSessionCommon;

class IServerCloseHandler {
public:
virtual ~IServerCloseHandler() = default;
// called when session should be closed by server signal
virtual void OnCloseSession(const TKqpSessionCommon*, std::shared_ptr<ISessionClient>) = 0;
};

class TKqpSessionCommon : public TEndpointObj {
public:
TKqpSessionCommon(const TStringType& sessionId, const TStringType& endpoint,
Expand Down Expand Up @@ -55,6 +64,12 @@ class TKqpSessionCommon : public TEndpointObj {
static std::function<void(TKqpSessionCommon*)>
GetSmartDeleter(std::shared_ptr<ISessionClient> client);

// Shoult be called under session pool lock
void UpdateServerCloseHandler(IServerCloseHandler*);

// Called asynchronously from grpc thread.
void CloseFromServer(std::weak_ptr<ISessionClient> client) noexcept;

protected:
TAdaptiveLock Lock_;

Expand All @@ -64,15 +79,20 @@ class TKqpSessionCommon : public TEndpointObj {
const bool IsOwnedBySessionPool_;

EState State_;
TInstant TimeToTouch_;
// This time is used during async close session handling which does not lock the session
// so we need to be able to read this value atomicaly
std::atomic<TInstant> TimeToTouch_;
TInstant TimeInPast_;
// Is used to implement progressive timeout for settler keep alive call
TDuration TimeInterval_;

std::atomic<IServerCloseHandler*> CloseHandler_;
// Indicate session was in active state, but state was changed
// (need to decrement active session counter)
// TODO: suboptimal because need lock for atomic change from interceptor
// Rewrite with bit field
bool NeedUpdateActiveCounter_;

};

} // namespace NYdb
Loading

0 comments on commit f322990

Please sign in to comment.