Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++ SDK] Remove session from session pool if stream session was closed by server side. #13199

Merged
merged 6 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 217 additions & 4 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include <ydb/public/sdk/cpp/client/ydb_types/operation/operation.h>

#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/core/base/counters.h>
#include <library/cpp/threading/local_executor/local_executor.h>

#include <fmt/format.h>

Expand Down Expand Up @@ -54,7 +56,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
WaitForZeroSessions(counters);
}

Y_UNIT_TEST(QueryOnClosedSession) {
void DoClosedSessionRemovedWhileActiveTest(bool withQuery) {
auto kikimr = DefaultKikimrRunner();
auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint());
NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
Expand All @@ -75,21 +77,134 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {

UNIT_ASSERT(allDoneOk);

auto execResult = session.ExecuteQuery("SELECT 1;",
NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
if (withQuery) {
auto execResult = session.ExecuteQuery("SELECT 1;",
NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();

UNIT_ASSERT_VALUES_EQUAL(execResult.GetStatus(), EStatus::BAD_SESSION);
UNIT_ASSERT_VALUES_EQUAL(execResult.GetStatus(), EStatus::BAD_SESSION);
}
}
// closed session must be removed from session pool
{
auto result = db.GetSession().GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT(result.GetSession().GetId() != id);
}
UNIT_ASSERT_VALUES_EQUAL(db.GetActiveSessionCount(), 0);
}
WaitForZeroSessions(counters);
}

Y_UNIT_TEST(ClosedSessionRemovedWhileActiveWithQuery) {
// - Session is active (user gfot it)
// - server close it
// - user executes query (got BAD SESSION)
// - session should be removed from pool
DoClosedSessionRemovedWhileActiveTest(true);
}

/* Not implemented in the sdk
Y_UNIT_TEST(ClosedSessionRemovedWhileActiveWithoutQuery) {
// - Session is active (user gfot it)
// - server close it
// - user do not executes any query
// - session should be removed from pool
DoClosedSessionRemovedWhileActiveTest(false);
}
*/
// Copy paste from table service but with some modifications for query service
// Checks read iterators/session/sdk counters have expected values
Y_UNIT_TEST(CloseSessionsWithLoad) {
auto kikimr = std::make_shared<TKikimrRunner>();
kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_DEBUG);
kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_SESSION, NLog::PRI_DEBUG);
kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_ACTOR, NLog::PRI_DEBUG);
kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_SERVICE, NLog::PRI_DEBUG);

NYdb::NQuery::TQueryClient db = kikimr->GetQueryClient();

const ui32 SessionsCount = 50;
const TDuration WaitDuration = TDuration::Seconds(1);

TVector<NYdb::NQuery::TQueryClient::TSession> sessions;
for (ui32 i = 0; i < SessionsCount; ++i) {
auto sessionResult = db.GetSession().GetValueSync();
UNIT_ASSERT_C(sessionResult.IsSuccess(), sessionResult.GetIssues().ToString());

sessions.push_back(sessionResult.GetSession());
}

NPar::LocalExecutor().RunAdditionalThreads(SessionsCount + 1);
NPar::LocalExecutor().ExecRange([&kikimr, sessions, WaitDuration](int id) mutable {
if (id == (i32)sessions.size()) {
Sleep(WaitDuration);
Cerr << "start sessions close....." << Endl;
auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr->GetEndpoint());
for (ui32 i = 0; i < sessions.size(); ++i) {
bool allDoneOk = true;
NTestHelpers::CheckDelete(clientConfig, sessions[i].GetId(), Ydb::StatusIds::SUCCESS, allDoneOk);
UNIT_ASSERT(allDoneOk);
}

Cerr << "finished sessions close....." << Endl;
auto counters = GetServiceCounters(kikimr->GetTestServer().GetRuntime()->GetAppData(0).Counters, "ydb");

ui64 pendingCompilations = 0;
do {
Sleep(WaitDuration);
pendingCompilations = counters->GetNamedCounter("name", "table.query.compilation.active_count", false)->Val();
Cerr << "still compiling... " << pendingCompilations << Endl;
} while (pendingCompilations != 0);

ui64 pendingSessions = 0;
do {
Sleep(WaitDuration);
pendingSessions = counters->GetNamedCounter("name", "table.session.active_count", false)->Val();
Cerr << "still active sessions ... " << pendingSessions << Endl;
} while (pendingSessions != 0);

return;
}

auto session = sessions[id];
TMaybe<TTransaction> tx;

while (true) {
if (tx) {
auto result = tx->Commit().GetValueSync();
if (!result.IsSuccess()) {
return;
}

tx = {};
continue;
}

auto query = Sprintf(R"(
SELECT Key, Text, Data FROM `/Root/EightShard` WHERE Key=%1$d + 0;
SELECT Key, Data, Text FROM `/Root/EightShard` WHERE Key=%1$d + 1;
SELECT Text, Key, Data FROM `/Root/EightShard` WHERE Key=%1$d + 2;
SELECT Text, Data, Key FROM `/Root/EightShard` WHERE Key=%1$d + 3;
SELECT Data, Key, Text FROM `/Root/EightShard` WHERE Key=%1$d + 4;
SELECT Data, Text, Key FROM `/Root/EightShard` WHERE Key=%1$d + 5;

UPSERT INTO `/Root/EightShard` (Key, Text) VALUES
(%2$dul, "New");
)", RandomNumber<ui32>(), RandomNumber<ui32>());

auto result = session.ExecuteQuery(query, TTxControl::BeginTx()).GetValueSync();
if (!result.IsSuccess()) {
dcherednik marked this conversation as resolved.
Show resolved Hide resolved
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_SESSION);
Cerr << "received non-success status for session " << id << Endl;
return;
}

tx = result.GetTransaction();
}
}, 0, SessionsCount + 1, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY);
WaitForZeroReadIterators(kikimr->GetTestServer(), "/Root/EightShard");
}

Y_UNIT_TEST(PeriodicTaskInSessionPool) {
auto kikimr = DefaultKikimrRunner();
auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint());
Expand Down Expand Up @@ -169,6 +284,104 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
WaitForZeroSessions(counters);
}

// Check closed session removed while its in the session pool
Y_UNIT_TEST(ClosedSessionRemovedFromPool) {
auto kikimr = DefaultKikimrRunner();
auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint());
NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);

{
auto db = kikimr.GetQueryClient();

TString id;
{
auto result = db.GetSession().GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT(result.GetSession().GetId());
auto session = result.GetSession();
id = session.GetId();
}

bool allDoneOk = true;
NTestHelpers::CheckDelete(clientConfig, id, Ydb::StatusIds::SUCCESS, allDoneOk);

Sleep(TDuration::Seconds(5));

UNIT_ASSERT(allDoneOk);
{
auto result = db.GetSession().GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
auto newSession = result.GetSession();
UNIT_ASSERT_C(newSession.GetId() != id, "closed id: " << id << " new id: " << newSession.GetId());

auto execResult = newSession.ExecuteQuery("SELECT 1;",
NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();

UNIT_ASSERT_VALUES_EQUAL(execResult.GetStatus(), EStatus::SUCCESS);
}
}
WaitForZeroSessions(counters);
}

// Attempt to trigger simultanous server side close and return session
// From sdk perspective check no dataraces
Y_UNIT_TEST(ReturnAndCloseSameTime) {
auto kikimr = DefaultKikimrRunner();
auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint());
NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);

size_t iterations = 999;
auto db = kikimr.GetQueryClient();

NPar::LocalExecutor().RunAdditionalThreads(2);
while (iterations--) {
auto lim = iterations % 33;
TVector<NYdb::NQuery::TQueryClient::TSession> sessions;
TVector<TString> sids;
sessions.reserve(lim);
sids.reserve(lim);
for (size_t i = 0; i < lim; ++i) {
auto sessionResult = db.GetSession().GetValueSync();
UNIT_ASSERT_C(sessionResult.IsSuccess(), sessionResult.GetIssues().ToString());

sessions.push_back(sessionResult.GetSession());
sids.push_back(sessions.back().GetId());
}

if (iterations & 1) {
auto rng = std::default_random_engine {};
std::ranges::shuffle(sids, rng);
}

NPar::LocalExecutor().ExecRange([sessions{std::move(sessions)}, sids{std::move(sids)}, clientConfig](int id) mutable{
if (id == 0) {
for (size_t i = 0; i < sessions.size(); i++) {
auto s = std::move(sessions[i]);
auto execResult = s.ExecuteQuery("SELECT 1;",
NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
switch (execResult.GetStatus()) {
case EStatus::SUCCESS:
case EStatus::BAD_SESSION:
break;
default:
UNIT_ASSERT_C(false, "unexpected status: " << execResult.GetStatus());
}
}
} else if (id == 1) {
for (size_t i = 0; i < sids.size(); i++) {
bool allDoneOk = true;
NTestHelpers::CheckDelete(clientConfig, sids[i], Ydb::StatusIds::SUCCESS, allDoneOk);
UNIT_ASSERT(allDoneOk);
}
} else {
Y_ABORT_UNLESS(false, "unexpected thread cxount");
}
}, 0, 2, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY);
}

WaitForZeroSessions(counters);
}

Y_UNIT_TEST(StreamExecuteQueryPure) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetQueryClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ TKqpSessionCommon::TKqpSessionCommon(
, State_(S_STANDALONE)
, TimeToTouch_(TInstant::Now())
, TimeInPast_(TInstant::Now())
, CloseHandler_(nullptr)
, NeedUpdateActiveCounter_(false)
{}

Expand Down Expand Up @@ -115,7 +116,7 @@ void TKqpSessionCommon::ScheduleTimeToTouch(TDuration interval,
if (updateTimeInPast) {
TimeInPast_ = now;
}
TimeToTouch_ = now + interval;
TimeToTouch_.store(now + interval, std::memory_order_relaxed);
}
}

Expand All @@ -126,11 +127,11 @@ void TKqpSessionCommon::ScheduleTimeToTouchFast(TDuration interval,
if (updateTimeInPast) {
TimeInPast_ = now;
}
TimeToTouch_ = now + interval;
TimeToTouch_.store(now + interval, std::memory_order_relaxed);
}

TInstant TKqpSessionCommon::GetTimeToTouchFast() const {
return TimeToTouch_;
return TimeToTouch_.load(std::memory_order_relaxed);
}

TInstant TKqpSessionCommon::GetTimeInPastFast() const {
Expand All @@ -146,6 +147,24 @@ TDuration TKqpSessionCommon::GetTimeInterval() const {
return TimeInterval_;
}

void TKqpSessionCommon::UpdateServerCloseHandler(IServerCloseHandler* handler) {
CloseHandler_.store(handler);
}

void TKqpSessionCommon::CloseFromServer(std::weak_ptr<ISessionClient> client) noexcept {
auto strong = client.lock();
if (!strong) {
// Session closed on the server after stopping client - do nothing
// moreover pool maybe destoyed now
return;
}

IServerCloseHandler* h = CloseHandler_.load();
if (h) {
h->OnCloseSession(this, strong);
}
}

////////////////////////////////////////////////////////////////////////////////

std::function<void(TKqpSessionCommon*)> TKqpSessionCommon::GetSmartDeleter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@ namespace NYdb {
////////////////////////////////////////////////////////////////////////////////
ui64 GetNodeIdFromSession(const TStringType& sessionId);

class TKqpSessionCommon;

class IServerCloseHandler {
public:
virtual ~IServerCloseHandler() = default;
// called when session should be closed by server signal
virtual void OnCloseSession(const TKqpSessionCommon*, std::shared_ptr<ISessionClient>) = 0;
};

class TKqpSessionCommon : public TEndpointObj {
public:
TKqpSessionCommon(const TStringType& sessionId, const TStringType& endpoint,
Expand Down Expand Up @@ -55,6 +64,12 @@ class TKqpSessionCommon : public TEndpointObj {
static std::function<void(TKqpSessionCommon*)>
GetSmartDeleter(std::shared_ptr<ISessionClient> client);

// Shoult be called under session pool lock
void UpdateServerCloseHandler(IServerCloseHandler*);

// Called asynchronously from grpc thread.
void CloseFromServer(std::weak_ptr<ISessionClient> client) noexcept;

protected:
TAdaptiveLock Lock_;

Expand All @@ -64,15 +79,20 @@ class TKqpSessionCommon : public TEndpointObj {
const bool IsOwnedBySessionPool_;

EState State_;
TInstant TimeToTouch_;
// This time is used during async close session handling which does not lock the session
// so we need to be able to read this value atomicaly
std::atomic<TInstant> TimeToTouch_;
TInstant TimeInPast_;
// Is used to implement progressive timeout for settler keep alive call
TDuration TimeInterval_;

std::atomic<IServerCloseHandler*> CloseHandler_;
// Indicate session was in active state, but state was changed
// (need to decrement active session counter)
// TODO: suboptimal because need lock for atomic change from interceptor
// Rewrite with bit field
bool NeedUpdateActiveCounter_;

};

} // namespace NYdb
Loading
Loading