Skip to content

Commit 1c5db7d

Browse files
authored
Merge secrets fixes to stable-25-3-1 (#26705)
2 parents 9f899ed + fc387f1 commit 1c5db7d

File tree

29 files changed

+1012
-257
lines changed

29 files changed

+1012
-257
lines changed

ydb/core/driver_lib/run/kikimr_services_initializers.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2263,7 +2263,7 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu
22632263
TActorSetupCmd(finalize, TMailboxType::HTSwap, appData->UserPoolId)));
22642264

22652265
if (appData->FeatureFlags.GetEnableSchemaSecrets()) {
2266-
auto describeSchemaSecretsService = NKqp::CreateDescribeSchemaSecretsService();
2266+
auto describeSchemaSecretsService = NKqp::TDescribeSchemaSecretsServiceFactory().CreateService();
22672267
setup->LocalServices.push_back(std::make_pair(
22682268
NKqp::MakeKqpDescribeSchemaSecretServiceId(NodeId),
22692269
TActorSetupCmd(describeSchemaSecretsService, TMailboxType::HTSwap, appData->UserPoolId)));

ydb/core/kqp/common/events/script_executions.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <ydb/core/protos/kqp.pb.h>
44
#include <ydb/core/protos/kqp_stats.pb.h>
55
#include <ydb/core/protos/kqp_physical.pb.h>
6+
#include <ydb/library/aclib/aclib.h>
67
#include <yql/essentials/public/issue/yql_issue.h>
78
#include <ydb/public/api/protos/ydb_operation.pb.h>
89
#include <ydb/public/api/protos/ydb_query.pb.h>
@@ -281,7 +282,7 @@ struct TEvSaveScriptExternalEffectRequest : public TEventLocal<TEvSaveScriptExte
281282
TString Database;
282283

283284
TString CustomerSuppliedId;
284-
TString UserToken;
285+
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
285286
std::vector<NKqpProto::TKqpExternalSink> Sinks;
286287
std::vector<TString> SecretNames;
287288
};
@@ -386,7 +387,7 @@ struct TEvSaveScriptFinalStatusResponse : public TEventLocal<TEvSaveScriptFinalS
386387
bool OperationAlreadyFinalized = false;
387388
bool WaitRetry = false;
388389
TString CustomerSuppliedId;
389-
TString UserToken;
390+
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
390391
std::vector<NKqpProto::TKqpExternalSink> Sinks;
391392
std::vector<TString> SecretNames;
392393
Ydb::StatusIds::StatusCode Status;

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -946,7 +946,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
946946
}
947947

948948
void GetSecretsSnapshot() {
949-
RegisterDescribeSecretsActor(this->SelfId(), UserToken ? UserToken->GetUserSID() : "", SecretNames, this->ActorContext().ActorSystem());
949+
RegisterDescribeSecretsActor(this->SelfId(), UserToken, Database, SecretNames, this->ActorContext().ActorSystem());
950950
}
951951

952952
void GetResourcesSnapshot() {

ydb/core/kqp/federated_query/kqp_federated_query_actors.cpp

Lines changed: 219 additions & 71 deletions
Large diffs are not rendered by default.

ydb/core/kqp/federated_query/kqp_federated_query_actors.h

Lines changed: 98 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,16 @@
22

33
#include <ydb/core/kqp/common/events/script_executions.h>
44
#include <ydb/core/protos/flat_scheme_op.pb.h>
5+
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
6+
#include <ydb/core/tx/tx_proxy/proxy.h>
7+
#include <ydb/core/tx/schemeshard/schemeshard.h>
8+
#include <ydb/core/tx/scheme_board/events.h>
59

610
#include <ydb/library/actors/core/actor.h>
711
#include <ydb/library/actors/core/actor_bootstrapped.h>
812
#include <ydb/library/aclib/aclib.h>
9-
#include <library/cpp/threading/future/future.h>
1013

11-
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
12-
#include <ydb/core/tx/tx_proxy/proxy.h>
13-
#include <ydb/core/tx/schemeshard/schemeshard.h>
14+
#include <library/cpp/threading/future/future.h>
1415

1516
namespace NKikimr::NKqp {
1617

@@ -24,60 +25,127 @@ class TDescribeSchemaSecretsService: public NActors::TActorBootstrapped<TDescrib
2425
struct TEvResolveSecret : public NActors::TEventLocal<TEvResolveSecret, EvResolveSecret> {
2526
public:
2627
TEvResolveSecret(
27-
const TString& ownerUserId,
28-
const TString& secretName,
28+
const TIntrusiveConstPtr<NACLib::TUserToken> userToken,
29+
const TString& database,
30+
const TVector<TString>& secretNames,
2931
NThreading::TPromise<TEvDescribeSecretsResponse::TDescription> promise
3032
)
31-
: UserToken(NACLib::TUserToken{ownerUserId, TVector<NACLib::TSID>{}})
32-
, SecretName(secretName)
33+
: UserToken(userToken)
34+
, Database(database)
35+
, SecretNames(secretNames)
3336
, Promise(promise)
3437
{
38+
Y_ENSURE(!Database.empty(), "Database name must be set in secret requests");
3539
}
3640

3741
public:
38-
const NACLib::TUserToken UserToken;
39-
const TString SecretName;
42+
const TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
43+
const TString Database;
44+
const TVector<TString> SecretNames;
4045
NThreading::TPromise<TEvDescribeSecretsResponse::TDescription> Promise;
4146
};
4247

48+
private:
49+
struct TVersionedSecret {
50+
ui64 SecretVersion = 0;
51+
ui64 PathId = 0;
52+
TString Name;
53+
TString Value;
54+
};
55+
56+
struct TResponseContext {
57+
using TIncomingOrderId = ui64;
58+
THashMap<TString, TIncomingOrderId> Secrets;
59+
NThreading::TPromise<TEvDescribeSecretsResponse::TDescription> Result;
60+
size_t FilledSecretsCnt = 0;
61+
};
62+
4363
private:
4464
STRICT_STFUNC(StateWait,
45-
hFunc(TEvResolveSecret, Handle);
46-
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
47-
hFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, Handle);
65+
hFunc(TEvResolveSecret, HandleIncomingRequest);
66+
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleSchemeCacheResponse);
67+
hFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, HandleSchemeShardResponse);
68+
hFunc(TSchemeBoardEvents::TEvNotifyDelete, HandleNotifyDelete);
69+
hFunc(TSchemeBoardEvents::TEvNotifyUpdate, HandleNotifyUpdate);
4870
cFunc(NActors::TEvents::TEvPoison::EventType, PassAway);
4971
)
5072

51-
void Handle(TEvResolveSecret::TPtr& ev);
52-
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
53-
void Handle(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev);
54-
void FillResponse(const ui64 requestId, const TEvDescribeSecretsResponse::TDescription& response);
55-
void SaveIncomingRequestInfo(const TEvResolveSecret& req);
56-
void SendSchemeCacheRequest(const TString& secretName);
73+
void HandleIncomingRequest(TEvResolveSecret::TPtr& ev);
74+
void HandleSchemeCacheResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
75+
void HandleSchemeShardResponse(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev);
76+
void HandleNotifyDelete(TSchemeBoardEvents::TEvNotifyDelete::TPtr& ev);
77+
void HandleNotifyUpdate(TSchemeBoardEvents::TEvNotifyUpdate::TPtr& ev);
78+
79+
void FillResponse(const ui64& requestId, const TEvDescribeSecretsResponse::TDescription& response);
80+
void SaveIncomingRequestInfo(const TEvResolveSecret& ev);
81+
void SendSchemeCacheRequests(const TEvResolveSecret& ev);
82+
bool LocalCacheHasActualVersion(const TVersionedSecret& secret, const ui64& cacheSecretVersion);
83+
bool LocalCacheHasActualObject(const TVersionedSecret& secret, const ui64& cacheSecretPathId);
84+
bool HandleSchemeCacheErrorsIfAny(const ui64& requestId, NSchemeCache::TSchemeCacheNavigate& result);
85+
void FillResponseIfFinished(const ui64& requestId, const TResponseContext& responseCtx);
5786

5887
public:
5988
TDescribeSchemaSecretsService() = default;
6089

6190
void Bootstrap();
6291

63-
private:
64-
struct TVersionedSecret {
65-
ui64 Version;
66-
TString Value;
92+
public:
93+
// For tests only
94+
class ISecretUpdateListener : public TThrRefBase {
95+
public:
96+
virtual void HandleNotifyDelete(const TString& secretName) = 0;
97+
virtual ~ISecretUpdateListener() = default;
6798
};
99+
void SetSecretUpdateListener(ISecretUpdateListener* secretUpdateListener) {
100+
SecretUpdateListener = secretUpdateListener;
101+
}
68102

103+
private:
69104
ui64 LastCookie = 0;
70-
THashMap<ui64, NThreading::TPromise<TEvDescribeSecretsResponse::TDescription>> ResolveInFlight;
71-
THashMap<ui64, TString> SecretNameInFlight;
72-
THashMap<TString, TVersionedSecret> SecretNameToValue;
105+
THashMap<ui64, TResponseContext> ResolveInFlight;
106+
THashMap<TString, TVersionedSecret> VersionedSecrets;
107+
THashMap<TString, TActorId> SchemeBoardSubscribers;
108+
ISecretUpdateListener* SecretUpdateListener;
73109
};
74110

75-
IActor* CreateDescribeSecretsActor(const TString& ownerUserId, const std::vector<TString>& secretIds, NThreading::TPromise<TEvDescribeSecretsResponse::TDescription> promise);
111+
void RegisterDescribeSecretsActor(
112+
const NActors::TActorId& replyActorId,
113+
const TIntrusiveConstPtr<NACLib::TUserToken> userToken,
114+
const TString& database,
115+
const std::vector<TString>& secretIds,
116+
NActors::TActorSystem* actorSystem
117+
);
118+
119+
NThreading::TFuture<TEvDescribeSecretsResponse::TDescription> DescribeExternalDataSourceSecrets(
120+
const NKikimrSchemeOp::TAuth& authDescription,
121+
const TIntrusiveConstPtr<NACLib::TUserToken> userToken,
122+
const TString& database,
123+
TActorSystem* actorSystem
124+
);
76125

77-
void RegisterDescribeSecretsActor(const TActorId& replyActorId, const TString& ownerUserId, const std::vector<TString>& secretIds, TActorSystem* actorSystem);
126+
IActor* CreateDescribeSchemaSecretsService();
78127

79-
NThreading::TFuture<TEvDescribeSecretsResponse::TDescription> DescribeExternalDataSourceSecrets(const NKikimrSchemeOp::TAuth& authDescription, const TString& ownerUserId, TActorSystem* actorSystem);
128+
class IDescribeSchemaSecretsServiceFactory {
129+
public:
130+
using TPtr = std::shared_ptr<IDescribeSchemaSecretsServiceFactory>;
80131

81-
IActor* CreateDescribeSchemaSecretsService();
132+
virtual IActor* CreateService() = 0;
133+
virtual ~IDescribeSchemaSecretsServiceFactory() = default;
134+
};
135+
136+
class TDescribeSchemaSecretsServiceFactory : public IDescribeSchemaSecretsServiceFactory {
137+
public:
138+
IActor* CreateService() override;
139+
};
140+
141+
NThreading::TFuture<TEvDescribeSecretsResponse::TDescription> DescribeSecret(
142+
const TVector<TString>& secretNames,
143+
const TIntrusiveConstPtr<NACLib::TUserToken> userToken,
144+
const TString& database,
145+
TActorSystem* actorSystem
146+
);
147+
148+
bool UseSchemaSecrets(const NKikimr::TFeatureFlags& flags, const TVector<TString>& secretNames);
149+
bool UseSchemaSecrets(const NKikimr::TFeatureFlags& flags, const TString& secretName);
82150

83151
} // namespace NKikimr::NKqp

0 commit comments

Comments
 (0)