diff --git a/ydb/core/discovery/discovery.cpp b/ydb/core/discovery/discovery.cpp index c0df2454b5de..8567774e992f 100644 --- a/ydb/core/discovery/discovery.cpp +++ b/ydb/core/discovery/discovery.cpp @@ -627,10 +627,6 @@ IActor* CreateDiscoverer( return new TDiscoverer(f, database, replyTo, cacheId); } -IActor* CreateDiscoveryCache() { - return new NDiscoveryPrivate::TDiscoveryCache(); -} - IActor* CreateDiscoveryCache(const TString& endpointId) { return new NDiscoveryPrivate::TDiscoveryCache(endpointId); } diff --git a/ydb/core/discovery/discovery.h b/ydb/core/discovery/discovery.h index c4da9e921592..9bcf43d8036f 100644 --- a/ydb/core/discovery/discovery.h +++ b/ydb/core/discovery/discovery.h @@ -75,7 +75,6 @@ IActor* CreateDiscoverer( const TActorId& cacheId); // Used to reduce number of requests to Board -IActor* CreateDiscoveryCache(); -IActor* CreateDiscoveryCache(const TString& endpointId); +IActor* CreateDiscoveryCache(const TString& endpointId = {}); } diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index f592064462e6..82fde19e6237 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -48,6 +48,7 @@ #include #include +#include #include #include @@ -1721,8 +1722,6 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se desc->Port = kakfaConfig.GetListeningPort(); desc->Ssl = kakfaConfig.HasSslCertificate(); - TVector services = {"datastreams", "pq", "pqv1"}; - desc->ServedServices.insert(desc->ServedServices.end(), services.begin(), services.end()); desc->EndpointId = NGRpcService::KafkaEndpointId; } @@ -2774,10 +2773,18 @@ void TKafkaProxyServiceInitializer::InitializeServices(NActors::TActorSystemSetu settings.CertificateFile = Config.GetKafkaProxyConfig().GetCert(); settings.PrivateKeyFile = Config.GetKafkaProxyConfig().GetKey(); + if (Config.GetKafkaProxyConfig().GetEnableEndpointDiscovery()) { + setup->LocalServices.emplace_back( + NKafka::MakeKafkaDiscoveryCacheID(), + TActorSetupCmd(CreateDiscoveryCache(NGRpcService::KafkaEndpointId), + TMailboxType::HTSwap, appData->UserPoolId) + ); + } setup->LocalServices.emplace_back( TActorId(), - TActorSetupCmd(NKafka::CreateKafkaListener(MakePollerActorId(), settings, Config.GetKafkaProxyConfig()), - TMailboxType::HTSwap, appData->UserPoolId) + TActorSetupCmd(NKafka::CreateKafkaListener(MakePollerActorId(), settings, Config.GetKafkaProxyConfig(), + NKafka::MakeKafkaDiscoveryCacheID()), + TMailboxType::HTSwap, appData->UserPoolId) ); IActor* metricsActor = CreateKafkaMetricsActor(NKafka::TKafkaMetricsSettings{appData->Counters}); diff --git a/ydb/core/kafka_proxy/actors/actors.h b/ydb/core/kafka_proxy/actors/actors.h index c97c0ffe7d57..b140e4cf5835 100644 --- a/ydb/core/kafka_proxy/actors/actors.h +++ b/ydb/core/kafka_proxy/actors/actors.h @@ -51,6 +51,8 @@ struct TContext { NKikimr::NPQ::TRlContext RlContext; + TActorId DiscoveryCacheActor; + bool Authenticated() { return AuthenticationStep == SUCCESS; } }; diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp index 2981f2b61a8f..87f1fd2204da 100644 --- a/ydb/core/kafka_proxy/kafka_connection.cpp +++ b/ydb/core/kafka_proxy/kafka_connection.cpp @@ -83,7 +83,8 @@ class TKafkaConnection: public TActorBootstrapped, public TNet TKafkaConnection(const TActorId& listenerActorId, TIntrusivePtr socket, TNetworkConfig::TSocketAddressType address, - const NKikimrConfig::TKafkaProxyConfig& config) + const NKikimrConfig::TKafkaProxyConfig& config, + const TActorId& discoveryCacheActorId) : ListenerActorId(listenerActorId) , Socket(std::move(socket)) , Address(address) @@ -91,9 +92,11 @@ class TKafkaConnection: public TActorBootstrapped, public TNet , Step(SIZE_READ) , Demand(NoDemand) , InflightSize(0) - , Context(std::make_shared(config)) { + , Context(std::make_shared(config)) + { SetNonBlock(); IsSslRequired = Socket->IsSslSupported(); + Context->DiscoveryCacheActor = discoveryCacheActorId; } void Bootstrap() { @@ -318,7 +321,7 @@ class TKafkaConnection: public TActorBootstrapped, public TNet TMessagePtr Cast(std::shared_ptr& request) { return TMessagePtr(request->Buffer, request->Message); } - + bool ProcessRequest(const TActorContext& ctx) { KAFKA_LOG_D("process message: ApiKey=" << Request->Header.RequestApiKey << ", ExpectedSize=" << Request->ExpectedSize << ", Size=" << Request->Size); @@ -339,7 +342,7 @@ class TKafkaConnection: public TActorBootstrapped, public TNet if (Request->Header.ClientId.has_value() && Request->Header.ClientId != "") { Context->KafkaClient = Request->Header.ClientId.value(); } - + switch (Request->Header.RequestApiKey) { case PRODUCE: HandleMessage(&Request->Header, Cast(Request), ctx); @@ -372,7 +375,7 @@ class TKafkaConnection: public TActorBootstrapped, public TNet case FETCH: HandleMessage(&Request->Header, Cast(Request)); break; - + case JOIN_GROUP: HandleMessage(&Request->Header, Cast(Request), ctx); break; @@ -388,11 +391,11 @@ class TKafkaConnection: public TActorBootstrapped, public TNet case HEARTBEAT: HandleMessage(&Request->Header, Cast(Request), ctx); break; - + case FIND_COORDINATOR: HandleMessage(&Request->Header, Cast(Request)); break; - + case OFFSET_FETCH: HandleMessage(&Request->Header, Cast(Request)); break; @@ -481,7 +484,7 @@ class TKafkaConnection: public TActorBootstrapped, public TNet void HandleKillReadSession() { if (ReadSessionActorId) { Send(ReadSessionActorId, new TEvents::TEvPoison()); - + TActorId emptyActor; ReadSessionActorId = emptyActor; } @@ -714,13 +717,14 @@ class TKafkaConnection: public TActorBootstrapped, public TNet bool RequireAuthentication(EApiKey apiKey) { bool configuredToAuthenticate = NKikimr::AppData()->EnforceUserTokenRequirement; - bool apiKeyRequiresAuthentication = !(EApiKey::API_VERSIONS == apiKey || - EApiKey::SASL_HANDSHAKE == apiKey || + bool apiKeyRequiresAuthentication = !(EApiKey::API_VERSIONS == apiKey || + EApiKey::SASL_HANDSHAKE == apiKey || EApiKey::SASL_AUTHENTICATE == apiKey); - + return configuredToAuthenticate && apiKeyRequiresAuthentication; } + void HandleConnected(TEvPollerReady::TPtr event, const TActorContext& ctx) { if (event->Get()->Read) { if (!CloseConnection) { @@ -784,8 +788,9 @@ class TKafkaConnection: public TActorBootstrapped, public TNet NActors::IActor* CreateKafkaConnection(const TActorId& listenerActorId, TIntrusivePtr socket, TNetworkConfig::TSocketAddressType address, - const NKikimrConfig::TKafkaProxyConfig& config) { - return new TKafkaConnection(listenerActorId, std::move(socket), std::move(address), config); + const NKikimrConfig::TKafkaProxyConfig& config, + const TActorId& discoveryCacheActorId) { + return new TKafkaConnection(listenerActorId, std::move(socket), std::move(address), config, discoveryCacheActorId); } } // namespace NKafka diff --git a/ydb/core/kafka_proxy/kafka_connection.h b/ydb/core/kafka_proxy/kafka_connection.h index 68b9e237ef29..344b6f1654ab 100644 --- a/ydb/core/kafka_proxy/kafka_connection.h +++ b/ydb/core/kafka_proxy/kafka_connection.h @@ -12,6 +12,7 @@ using namespace NKikimr::NRawSocket; NActors::IActor* CreateKafkaConnection(const TActorId& listenerActorId, TIntrusivePtr socket, TNetworkConfig::TSocketAddressType address, - const NKikimrConfig::TKafkaProxyConfig& config); + const NKikimrConfig::TKafkaProxyConfig& config, + const TActorId& discoveryCacheActorId); } // namespace NKafka diff --git a/ydb/core/kafka_proxy/kafka_listener.h b/ydb/core/kafka_proxy/kafka_listener.h index 494504937402..63b1b42f56e3 100644 --- a/ydb/core/kafka_proxy/kafka_listener.h +++ b/ydb/core/kafka_proxy/kafka_listener.h @@ -7,11 +7,20 @@ namespace NKafka { using namespace NKikimr::NRawSocket; -inline NActors::IActor* CreateKafkaListener(const NActors::TActorId& poller, const TListenerSettings& settings, const NKikimrConfig::TKafkaProxyConfig& config) { + +TActorId MakeKafkaDiscoveryCacheID() { + static const char x[12] = "kafka_dsc_c"; + return TActorId(0, TStringBuf(x, 12)); +} + +inline NActors::IActor* CreateKafkaListener( + const NActors::TActorId& poller, const TListenerSettings& settings, const NKikimrConfig::TKafkaProxyConfig& config, + const TActorId& discoveryCacheActorId +) { return CreateSocketListener( poller, settings, [=](const TActorId& listenerActorId, TIntrusivePtr socket, TNetworkConfig::TSocketAddressType address) { - return CreateKafkaConnection(listenerActorId, socket, address, config); + return CreateKafkaConnection(listenerActorId, socket, address, config, discoveryCacheActorId); }, NKikimrServices::EServiceKikimr::KAFKA_PROXY, EErrorAction::Abort); } diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index bae674413882..e20dfa356be7 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -1101,7 +1101,8 @@ namespace Tests { settings.SslCertificatePem = Settings->AppConfig->GetKafkaProxyConfig().GetSslCertificate(); } - IActor* actor = NKafka::CreateKafkaListener(MakePollerActorId(), settings, Settings->AppConfig->GetKafkaProxyConfig()); + IActor* actor = NKafka::CreateKafkaListener(MakePollerActorId(), settings, Settings->AppConfig->GetKafkaProxyConfig(), + TActorId{}); TActorId actorId = Runtime->Register(actor, nodeIdx, userPoolId); Runtime->RegisterService(TActorId{}, actorId, nodeIdx); @@ -1123,9 +1124,6 @@ namespace Tests { rootDomains.emplace_back("/" + domain->Name); } desc->ServedDatabases.insert(desc->ServedDatabases.end(), rootDomains.begin(), rootDomains.end()); - - TVector grpcServices = {"datastreams", "pq", "pqv1"}; - desc->ServedServices.insert(desc->ServedServices.end(), grpcServices.begin(), grpcServices.end()); Runtime->GetActorSystem(0)->Register(NGRpcService::CreateGrpcEndpointPublishActor(desc.Get()), TMailboxType::ReadAsFilled, appData.UserPoolId); } }