diff --git a/ydb/core/discovery/discovery.cpp b/ydb/core/discovery/discovery.cpp index 60fcbf49b62f..8567774e992f 100644 --- a/ydb/core/discovery/discovery.cpp +++ b/ydb/core/discovery/discovery.cpp @@ -184,7 +184,6 @@ namespace NDiscovery { if (!CheckEndpointId(endpointId, entry)) { continue; } - if (entry.GetSsl()) { AddEndpoint(cachedMessageSsl, statesSsl, entry); } else { @@ -200,7 +199,6 @@ namespace NDiscovery { cachedMessageSsl.set_self_location(location); } } - return {SerializeResult(cachedMessage), SerializeResult(cachedMessageSsl), std::move(infoEntries)}; } } @@ -235,6 +233,7 @@ namespace NDiscoveryPrivate { THashMap> Requested; bool Scheduled = false; + TMaybe EndpointId; auto Request(const TString& database) { auto result = Requested.emplace(database, TVector()); @@ -279,7 +278,8 @@ namespace NDiscoveryPrivate { currentCachedMessage = std::make_shared( NDiscovery::CreateCachedMessage( - currentCachedMessage->InfoEntries, std::move(msg->Updates), {}, {}, NameserviceResponse) + currentCachedMessage->InfoEntries, std::move(msg->Updates), + {}, EndpointId.GetOrElse({}), NameserviceResponse) ); auto it = Requested.find(path); @@ -293,7 +293,8 @@ namespace NDiscoveryPrivate { const auto& path = msg->Path; auto newCachedData = std::make_shared( - NDiscovery::CreateCachedMessage({}, std::move(msg->InfoEntries), {}, {}, NameserviceResponse) + NDiscovery::CreateCachedMessage({}, std::move(msg->InfoEntries), + {}, EndpointId.GetOrElse({}), NameserviceResponse) ); newCachedData->Status = msg->Status; @@ -372,6 +373,11 @@ namespace NDiscoveryPrivate { } public: + TDiscoveryCache() = default; + TDiscoveryCache(const TString& endpointId) + : EndpointId(endpointId) + { + } static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::DISCOVERY_CACHE_ACTOR; } @@ -523,7 +529,7 @@ class TDiscoverer: public TActorBootstrapped { return true; default: return true; - } + } } void MaybeReply() { @@ -621,8 +627,8 @@ IActor* CreateDiscoverer( return new TDiscoverer(f, database, replyTo, cacheId); } -IActor* CreateDiscoveryCache() { - return new NDiscoveryPrivate::TDiscoveryCache(); +IActor* CreateDiscoveryCache(const TString& endpointId) { + return new NDiscoveryPrivate::TDiscoveryCache(endpointId); } } diff --git a/ydb/core/discovery/discovery.h b/ydb/core/discovery/discovery.h index 2eab01dae01a..9bcf43d8036f 100644 --- a/ydb/core/discovery/discovery.h +++ b/ydb/core/discovery/discovery.h @@ -75,6 +75,6 @@ IActor* CreateDiscoverer( const TActorId& cacheId); // Used to reduce number of requests to Board -IActor* CreateDiscoveryCache(); +IActor* CreateDiscoveryCache(const TString& endpointId = {}); } diff --git a/ydb/core/driver_lib/run/config_parser.cpp b/ydb/core/driver_lib/run/config_parser.cpp index b4f65f69530e..c91796af6d5b 100644 --- a/ydb/core/driver_lib/run/config_parser.cpp +++ b/ydb/core/driver_lib/run/config_parser.cpp @@ -67,6 +67,7 @@ void TRunCommandConfigParser::SetupLastGetOptForConfigFiles(NLastGetopt::TOpts& opts.AddLongOption("grpc-file", "gRPC config file").OptionalArgument("PATH"); opts.AddLongOption("grpc-port", "enable gRPC server on port").RequiredArgument("PORT"); opts.AddLongOption("grpcs-port", "enable gRPC SSL server on port").RequiredArgument("PORT"); + opts.AddLongOption("kafka-port", "enable kafka proxy server on port").OptionalArgument("PORT"); opts.AddLongOption("grpc-public-host", "set public gRPC host for discovery").RequiredArgument("HOST"); opts.AddLongOption("grpc-public-port", "set public gRPC port for discovery").RequiredArgument("PORT"); opts.AddLongOption("grpcs-public-port", "set public gRPC SSL port for discovery").RequiredArgument("PORT"); @@ -165,6 +166,11 @@ void TRunCommandConfigParser::ParseConfigFiles(const NLastGetopt::TOptsParseResu conf.SetSslPort(FromString(res.Get("grpcs-port"))); } + if (res.Has("kafka-port")) { + auto& conf = *Config.AppConfig.MutableKafkaProxyConfig(); + conf.SetListeningPort(FromString(res.Get("kafka-port"))); + } + if (res.Has("grpc-public-host")) { auto& conf = *Config.AppConfig.MutableGRpcConfig(); conf.SetPublicHost(res.Get("grpc-public-host")); diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index d59d9315b129..6321f43e3e68 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -48,6 +48,7 @@ #include #include +#include #include #include @@ -1714,6 +1715,18 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se endpoints.push_back(std::move(desc)); } + if (Config.GetKafkaProxyConfig().GetEnableKafkaProxy()) { + const auto& kakfaConfig = Config.GetKafkaProxyConfig(); + TIntrusivePtr desc = new NGRpcService::TGrpcEndpointDescription(); + desc->Address = config.GetPublicHost() ? config.GetPublicHost() : address; + desc->Port = kakfaConfig.GetListeningPort(); + desc->Ssl = kakfaConfig.HasSslCertificate(); + + desc->EndpointId = NGRpcService::KafkaEndpointId; + endpoints.push_back(std::move(desc)); + + } + for (auto &sx : config.GetExtEndpoints()) { const TString &localAddress = sx.GetHost() ? (sx.GetHost() != "[::]" ? sx.GetHost() : FQDNHostName()) : address; if (const ui32 port = sx.GetPort()) { @@ -2743,10 +2756,16 @@ void TKafkaProxyServiceInitializer::InitializeServices(NActors::TActorSystemSetu settings.PrivateKeyFile = Config.GetKafkaProxyConfig().GetKey(); setup->LocalServices.emplace_back( - TActorId(), - TActorSetupCmd(NKafka::CreateKafkaListener(MakePollerActorId(), settings, Config.GetKafkaProxyConfig()), + NKafka::MakeKafkaDiscoveryCacheID(), + TActorSetupCmd(CreateDiscoveryCache(NGRpcService::KafkaEndpointId), TMailboxType::HTSwap, appData->UserPoolId) ); + setup->LocalServices.emplace_back( + TActorId(), + TActorSetupCmd(NKafka::CreateKafkaListener(MakePollerActorId(), settings, Config.GetKafkaProxyConfig(), + NKafka::MakeKafkaDiscoveryCacheID()), + TMailboxType::HTSwap, appData->UserPoolId) + ); IActor* metricsActor = CreateKafkaMetricsActor(NKafka::TKafkaMetricsSettings{appData->Counters}); setup->LocalServices.emplace_back( diff --git a/ydb/core/grpc_services/grpc_endpoint.h b/ydb/core/grpc_services/grpc_endpoint.h index 0f107803f426..46f55add335f 100644 --- a/ydb/core/grpc_services/grpc_endpoint.h +++ b/ydb/core/grpc_services/grpc_endpoint.h @@ -28,5 +28,6 @@ inline TActorId CreateGrpcPublisherServiceActorId() { return actorId; } +const static TString KafkaEndpointId = "KafkaProxy"; } } diff --git a/ydb/core/kafka_proxy/actors/actors.h b/ydb/core/kafka_proxy/actors/actors.h index 933c3238a503..3a07f1dd871e 100644 --- a/ydb/core/kafka_proxy/actors/actors.h +++ b/ydb/core/kafka_proxy/actors/actors.h @@ -51,9 +51,11 @@ struct TContext { NKikimr::NPQ::TRlContext RlContext; - bool Authenticated() { - return !RequireAuthentication || AuthenticationStep == SUCCESS; + bool Authenticated() { + return !RequireAuthentication || AuthenticationStep == SUCCESS; } + + TActorId DiscoveryCacheActor; }; template T> @@ -165,7 +167,9 @@ inline TString GetUserSerializedToken(std::shared_ptr context) { NActors::IActor* CreateKafkaApiVersionsActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr& message); NActors::IActor* CreateKafkaInitProducerIdActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr& message); -NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr& message); +NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context, const ui64 correlationId, + const TMessagePtr& message, + const TActorId& discoveryCacheActor); NActors::IActor* CreateKafkaProduceActor(const TContext::TPtr context); NActors::IActor* CreateKafkaReadSessionActor(const TContext::TPtr context, ui64 cookie); NActors::IActor* CreateKafkaSaslHandshakeActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr& message); diff --git a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp index cf16a8de3e3b..b2cd1e00657e 100644 --- a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp @@ -2,14 +2,23 @@ #include #include +#include +#include namespace NKafka { +using namespace NKikimr; using namespace NKikimr::NGRpcProxy::V1; +TActorId MakeKafkaDiscoveryCacheID() { + static const char x[12] = "kafka_dsc_c"; + return TActorId(0, TStringBuf(x, 12)); +} + NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context, const ui64 correlationId, - const TMessagePtr& message) { - return new TKafkaMetadataActor(context, correlationId, message); + const TMessagePtr& message, + const TActorId& discoveryCacheActor) { + return new TKafkaMetadataActor(context, correlationId, message, discoveryCacheActor); } void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) { @@ -19,31 +28,91 @@ void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) { if (WithProxy) { AddProxyNodeToBrokers(); - } + } else { + SendDiscoveryRequest(); - if (Message->Topics.size() == 0 && !WithProxy) { - AddCurrentNodeToBrokers(); + if (Message->Topics.size() == 0) { + NeedCurrentNode = true; + } } if (Message->Topics.size() != 0) { ProcessTopics(); } - + Become(&TKafkaMetadataActor::StateWork); RespondIfRequired(ctx); } -void TKafkaMetadataActor::AddCurrentNodeToBrokers() { +void TKafkaMetadataActor::SendDiscoveryRequest() { + Y_VERIFY_DEBUG(DiscoveryCacheActor); PendingResponses++; - Send(NKikimr::NIcNodeCache::CreateICNodesInfoCacheServiceId(), new NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoRequest()); + Register(CreateDiscoverer(&MakeEndpointsBoardPath, Context->DatabasePath, SelfId(), DiscoveryCacheActor)); +} + + +void TKafkaMetadataActor::HandleDiscoveryError(TEvDiscovery::TEvError::TPtr& ev) { + PendingResponses--; + HaveError = true; + KAFKA_LOG_ERROR("Port discovery failed for database '" << Context->DatabasePath << "' with error '" << ev->Get()->Error + << ", request " << CorrelationId); + + RespondIfRequired(ActorContext()); +} + +void TKafkaMetadataActor::HandleDiscoveryData(TEvDiscovery::TEvDiscoveryData::TPtr& ev) { + PendingResponses--; + ProcessDiscoveryData(ev); + RespondIfRequired(ActorContext()); +} + +void TKafkaMetadataActor::ProcessDiscoveryData(TEvDiscovery::TEvDiscoveryData::TPtr& ev) { + bool expectSsl = Context->Config.HasSslCertificate(); + + Ydb::Discovery::ListEndpointsResponse leResponse; + Ydb::Discovery::ListEndpointsResult leResult; + TString const* cachedMessage; + if (expectSsl) { + cachedMessage = &ev->Get()->CachedMessageData->CachedMessageSsl; + } else { + cachedMessage = &ev->Get()->CachedMessageData->CachedMessage; + } + auto ok = leResponse.ParseFromString(*cachedMessage); + if (ok) { + ok = leResponse.operation().result().UnpackTo(&leResult); + } + if (!ok) { + KAFKA_LOG_ERROR("Port discovery failed, unable to parse discovery respose for request " << CorrelationId); + HaveError = true; + return; + } + + for (auto& endpoint : leResult.endpoints()) { + Nodes.insert({endpoint.node_id(), {endpoint.address(), endpoint.port()}}); + } +} + +void TKafkaMetadataActor::RequestICNodeCache() { + Y_ABORT_UNLESS(!FallbackToIcDiscovery); + FallbackToIcDiscovery = true; + PendingResponses++; + Send(NKikimr::NIcNodeCache::CreateICNodesInfoCacheServiceId(), new NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoRequest()); +} + +void TKafkaMetadataActor::HandleNodesResponse( + NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev, + const NActors::TActorContext& ctx +) { + Y_ABORT_UNLESS(FallbackToIcDiscovery); + for (const auto& [nodeId, index] : *ev->Get()->NodeIdsMapping) { + Nodes[nodeId] = {(*ev->Get()->Nodes)[index].Host, (ui32)Context->Config.GetListeningPort()}; + } + --PendingResponses; + RespondIfRequired(ctx); } void TKafkaMetadataActor::AddProxyNodeToBrokers() { - auto broker = TMetadataResponseData::TMetadataResponseBroker{}; - broker.NodeId = ProxyNodeId; - broker.Host = Context->Config.GetProxy().GetHostname(); - broker.Port = Context->Config.GetProxy().GetPort(); - Response->Brokers.emplace_back(std::move(broker)); + AddBroker(ProxyNodeId, Context->Config.GetProxy().GetHostname(), Context->Config.GetProxy().GetPort()); } void TKafkaMetadataActor::ProcessTopics() { @@ -70,34 +139,30 @@ void TKafkaMetadataActor::ProcessTopics() { } } -void TKafkaMetadataActor::HandleNodesResponse(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev, const NActors::TActorContext& ctx) { - auto iter = ev->Get()->NodeIdsMapping->find(ctx.SelfID.NodeId()); - Y_ABORT_UNLESS(!iter.IsEnd()); - auto host = (*ev->Get()->Nodes)[iter->second].Host; - KAFKA_LOG_D("Incoming TEvGetAllNodesInfoResponse. Host#: " << host); - - auto broker = TMetadataResponseData::TMetadataResponseBroker{}; - broker.NodeId = ctx.SelfID.NodeId(); - broker.Host = host; - broker.Port = Context->Config.GetListeningPort(); - Response->Brokers.emplace_back(std::move(broker)); - - --PendingResponses; - RespondIfRequired(ctx); -} - TActorId TKafkaMetadataActor::SendTopicRequest(const TMetadataRequestData::TMetadataRequestTopic& topicRequest) { - KAFKA_LOG_D("Describe partitions locations for topic '" << *topicRequest.Name << "' for user " << GetUsernameOrAnonymous(Context)); + KAFKA_LOG_D("Describe partitions locations for topic '" << *topicRequest.Name << "' for user '" << Context->UserToken->GetUserSID() << "'"); TGetPartitionsLocationRequest locationRequest{}; locationRequest.Topic = NormalizePath(Context->DatabasePath, topicRequest.Name.value()); - locationRequest.Token = GetUserSerializedToken(Context); + locationRequest.Token = Context->UserToken->GetSerializedToken(); locationRequest.Database = Context->DatabasePath; PendingResponses++; return Register(new TPartitionsLocationActor(locationRequest, SelfId())); -} +} + +TVector TKafkaMetadataActor::CheckTopicNodes(TEvLocationResponse* response) { + TVector partitionNodes; + for (const auto& part : response->Partitions) { + auto iter = Nodes.find(part.NodeId); + if (iter.IsEnd()) { + return {}; + } + partitionNodes.push_back(&iter->second); + } + return partitionNodes; +} void TKafkaMetadataActor::AddTopicError( TMetadataResponseData::TMetadataResponseTopic& topic, EKafkaErrors errorCode @@ -106,10 +171,15 @@ void TKafkaMetadataActor::AddTopicError( ErrorCode = errorCode; } -void TKafkaMetadataActor::AddTopicResponse(TMetadataResponseData::TMetadataResponseTopic& topic, TEvLocationResponse* response) { +void TKafkaMetadataActor::AddTopicResponse( + TMetadataResponseData::TMetadataResponseTopic& topic, + TEvLocationResponse* response, + const TVector& partitionNodes +) { topic.ErrorCode = NONE_ERROR; topic.Partitions.reserve(response->Partitions.size()); + auto nodeIter = partitionNodes.begin(); for (const auto& part : response->Partitions) { auto nodeId = WithProxy ? ProxyNodeId : part.NodeId; @@ -126,28 +196,24 @@ void TKafkaMetadataActor::AddTopicResponse(TMetadataResponseData::TMetadataRespo if (!WithProxy) { auto ins = AllClusterNodes.insert(part.NodeId); if (ins.second) { - auto hostname = part.Hostname; + auto hostname = (*nodeIter)->Host; if (hostname.StartsWith(UnderlayPrefix)) { hostname = hostname.substr(sizeof(UnderlayPrefix) - 1); } - - auto broker = TMetadataResponseData::TMetadataResponseBroker{}; - broker.NodeId = part.NodeId; - broker.Host = hostname; - broker.Port = Context->Config.GetListeningPort(); - Response->Brokers.emplace_back(std::move(broker)); + AddBroker(part.NodeId, hostname, (*nodeIter)->Port); } } + ++nodeIter; } } -void TKafkaMetadataActor::HandleResponse(TEvLocationResponse::TPtr ev, const TActorContext& ctx) { +void TKafkaMetadataActor::HandleLocationResponse(TEvLocationResponse::TPtr ev, const TActorContext& ctx) { --PendingResponses; auto* r = ev->Get(); auto actorIter = TopicIndexes.find(ev->Sender); - Y_DEBUG_ABORT_UNLESS(!actorIter.IsEnd()); + Y_DEBUG_ABORT_UNLESS(!actorIter.IsEnd()); Y_DEBUG_ABORT_UNLESS(!actorIter->second.empty()); if (actorIter.IsEnd()) { @@ -157,29 +223,79 @@ void TKafkaMetadataActor::HandleResponse(TEvLocationResponse::TPtr ev, const TAc if (actorIter->second.empty()) { KAFKA_LOG_CRIT("Corrupted state (empty actorId in mapping). Ignored location response, expect incomplete reply"); - return RespondIfRequired(ctx); } - + for (auto index : actorIter->second) { auto& topic = Response->Topics[index]; if (r->Status == Ydb::StatusIds::SUCCESS) { KAFKA_LOG_D("Describe topic '" << topic.Name << "' location finishied successful"); - AddTopicResponse(topic, r); + PendingTopicResponses.insert(std::make_pair(index, ev->Release())); } else { KAFKA_LOG_ERROR("Describe topic '" << topic.Name << "' location finishied with error: Code=" << r->Status << ", Issues=" << r->Issues.ToOneLineString()); AddTopicError(topic, ConvertErrorCode(r->Status)); } } + RespondIfRequired(ctx); +} - RespondIfRequired(ActorContext()); +void TKafkaMetadataActor::AddBroker(ui64 nodeId, const TString& host, ui64 port) { + auto broker = TMetadataResponseData::TMetadataResponseBroker{}; + broker.NodeId = nodeId; + broker.Host = host; + broker.Port = port; + Response->Brokers.emplace_back(std::move(broker)); } void TKafkaMetadataActor::RespondIfRequired(const TActorContext& ctx) { - if (PendingResponses == 0) { + auto Respond = [&] { Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, Response, ErrorCode)); Die(ctx); + }; + + if (HaveError) { + ErrorCode = EKafkaErrors::LISTENER_NOT_FOUND; + for (auto& topic : Response->Topics) { + AddTopicError(topic, ErrorCode); + } + Respond(); + return; } + if (PendingResponses != 0) { + return; + } + + if (NeedCurrentNode) { + auto nodeIter = Nodes.find(SelfId().NodeId()); + if (nodeIter.IsEnd()) { + // Node info was not found, request from IC nodes cache instead + RequestICNodeCache(); + return; + } + AddBroker(nodeIter->first, nodeIter->second.Host, nodeIter->second.Port); + NeedCurrentNode = false; + } + while (!PendingTopicResponses.empty()) { + auto& [index, ev] = *PendingTopicResponses.begin(); + auto& topic = Response->Topics[index]; + auto topicNodes = CheckTopicNodes(ev.Get()); + if (topicNodes.empty()) { + if (!FallbackToIcDiscovery) { + // Node info wasn't found via discovery, fallback to interconnect + RequestICNodeCache(); + return; + } else { + // Already tried both YDB discovery and interconnect, still couldn't find the node for partition. Throw error + KAFKA_LOG_ERROR("Could not discovery kafka port for topic '" << topic.Name); + AddTopicError(topic, EKafkaErrors::LISTENER_NOT_FOUND); + } + } else { + AddTopicResponse(topic, ev.Get(), topicNodes); + } + PendingTopicResponses.erase(PendingTopicResponses.begin()); + } + + Respond(); } TString TKafkaMetadataActor::LogPrefix() const { diff --git a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h index db6555e72423..553a4a6f7338 100644 --- a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h @@ -5,17 +5,22 @@ #include #include #include +#include +#include + namespace NKafka { class TKafkaMetadataActor: public NActors::TActorBootstrapped { public: - TKafkaMetadataActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr& message) + TKafkaMetadataActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr& message, + const TActorId& discoveryCacheActor) : Context(context) , CorrelationId(correlationId) , Message(message) , WithProxy(context->Config.HasProxy() && !context->Config.GetProxy().GetHostname().empty()) , Response(new TMetadataResponseData()) + , DiscoveryCacheActor(discoveryCacheActor) {} void Bootstrap(const NActors::TActorContext& ctx); @@ -23,21 +28,36 @@ class TKafkaMetadataActor: public NActors::TActorBootstrapped& nodes); void AddTopicError(TMetadataResponseData::TMetadataResponseTopic& topic, EKafkaErrors errorCode); void RespondIfRequired(const NActors::TActorContext& ctx); void AddProxyNodeToBrokers(); - void AddCurrentNodeToBrokers(); + void AddBroker(ui64 nodeId, const TString& host, ui64 port); + void RequestICNodeCache(); void ProcessTopics(); + void SendDiscoveryRequest(); + void ProcessDiscoveryData(NKikimr::TEvDiscovery::TEvDiscoveryData::TPtr& ev); + TVector CheckTopicNodes(TEvLocationResponse* response); STATEFN(StateWork) { switch (ev->GetTypeRewrite()) { - HFunc(TEvLocationResponse, HandleResponse); + HFunc(TEvLocationResponse, HandleLocationResponse); HFunc(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse, HandleNodesResponse); + hFunc(NKikimr::TEvDiscovery::TEvDiscoveryData, HandleDiscoveryData); + hFunc(NKikimr::TEvDiscovery::TEvError, HandleDiscoveryError); } } @@ -55,6 +75,14 @@ class TKafkaMetadataActor: public NActors::TActorBootstrapped> TopicIndexes; THashSet AllClusterNodes; EKafkaErrors ErrorCode = EKafkaErrors::NONE_ERROR; + + TActorId DiscoveryCacheActor; + bool NeedCurrentNode = false; + bool HaveError = false; + bool FallbackToIcDiscovery = false; + TMap> PendingTopicResponses; + + THashMap Nodes; }; } // namespace NKafka diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp index 40f700a4b880..0a0b2aa29a44 100644 --- a/ydb/core/kafka_proxy/kafka_connection.cpp +++ b/ydb/core/kafka_proxy/kafka_connection.cpp @@ -83,7 +83,8 @@ class TKafkaConnection: public TActorBootstrapped, public TNet TKafkaConnection(const TActorId& listenerActorId, TIntrusivePtr socket, TNetworkConfig::TSocketAddressType address, - const NKikimrConfig::TKafkaProxyConfig& config) + const NKikimrConfig::TKafkaProxyConfig& config, + const TActorId& discoveryCacheActorId) : ListenerActorId(listenerActorId) , Socket(std::move(socket)) , Address(address) @@ -91,9 +92,11 @@ class TKafkaConnection: public TActorBootstrapped, public TNet , Step(SIZE_READ) , Demand(NoDemand) , InflightSize(0) - , Context(std::make_shared(config)) { + , Context(std::make_shared(config)) + { SetNonBlock(); IsSslRequired = Socket->IsSslSupported(); + Context->DiscoveryCacheActor = discoveryCacheActorId; } void Bootstrap() { @@ -271,7 +274,7 @@ class TKafkaConnection: public TActorBootstrapped, public TNet } void HandleMessage(TRequestHeaderData* header, const TMessagePtr& message) { - Register(CreateKafkaMetadataActor(Context, header->CorrelationId, message)); + Register(CreateKafkaMetadataActor(Context, header->CorrelationId, message, Context->DiscoveryCacheActor)); } void HandleMessage(const TRequestHeaderData* header, const TMessagePtr& message) { @@ -318,7 +321,7 @@ class TKafkaConnection: public TActorBootstrapped, public TNet TMessagePtr Cast(std::shared_ptr& request) { return TMessagePtr(request->Buffer, request->Message); } - + bool ProcessRequest(const TActorContext& ctx) { KAFKA_LOG_D("process message: ApiKey=" << Request->Header.RequestApiKey << ", ExpectedSize=" << Request->ExpectedSize << ", Size=" << Request->Size); @@ -372,7 +375,7 @@ class TKafkaConnection: public TActorBootstrapped, public TNet case FETCH: HandleMessage(&Request->Header, Cast(Request)); break; - + case JOIN_GROUP: HandleMessage(&Request->Header, Cast(Request), ctx); break; @@ -388,11 +391,11 @@ class TKafkaConnection: public TActorBootstrapped, public TNet case HEARTBEAT: HandleMessage(&Request->Header, Cast(Request), ctx); break; - + case FIND_COORDINATOR: HandleMessage(&Request->Header, Cast(Request)); break; - + case OFFSET_FETCH: HandleMessage(&Request->Header, Cast(Request)); break; @@ -481,7 +484,7 @@ class TKafkaConnection: public TActorBootstrapped, public TNet void HandleKillReadSession() { if (ReadSessionActorId) { Send(ReadSessionActorId, new TEvents::TEvPoison()); - + TActorId emptyActor; ReadSessionActorId = emptyActor; } @@ -720,6 +723,7 @@ class TKafkaConnection: public TActorBootstrapped, public TNet EApiKey::SASL_AUTHENTICATE == apiKey); } + void HandleConnected(TEvPollerReady::TPtr event, const TActorContext& ctx) { if (event->Get()->Read) { if (!CloseConnection) { @@ -783,8 +787,9 @@ class TKafkaConnection: public TActorBootstrapped, public TNet NActors::IActor* CreateKafkaConnection(const TActorId& listenerActorId, TIntrusivePtr socket, TNetworkConfig::TSocketAddressType address, - const NKikimrConfig::TKafkaProxyConfig& config) { - return new TKafkaConnection(listenerActorId, std::move(socket), std::move(address), config); + const NKikimrConfig::TKafkaProxyConfig& config, + const TActorId& discoveryCacheActorId) { + return new TKafkaConnection(listenerActorId, std::move(socket), std::move(address), config, discoveryCacheActorId); } } // namespace NKafka diff --git a/ydb/core/kafka_proxy/kafka_connection.h b/ydb/core/kafka_proxy/kafka_connection.h index 68b9e237ef29..344b6f1654ab 100644 --- a/ydb/core/kafka_proxy/kafka_connection.h +++ b/ydb/core/kafka_proxy/kafka_connection.h @@ -12,6 +12,7 @@ using namespace NKikimr::NRawSocket; NActors::IActor* CreateKafkaConnection(const TActorId& listenerActorId, TIntrusivePtr socket, TNetworkConfig::TSocketAddressType address, - const NKikimrConfig::TKafkaProxyConfig& config); + const NKikimrConfig::TKafkaProxyConfig& config, + const TActorId& discoveryCacheActorId); } // namespace NKafka diff --git a/ydb/core/kafka_proxy/kafka_listener.h b/ydb/core/kafka_proxy/kafka_listener.h index 494504937402..c8ef8ef069f8 100644 --- a/ydb/core/kafka_proxy/kafka_listener.h +++ b/ydb/core/kafka_proxy/kafka_listener.h @@ -7,11 +7,17 @@ namespace NKafka { using namespace NKikimr::NRawSocket; -inline NActors::IActor* CreateKafkaListener(const NActors::TActorId& poller, const TListenerSettings& settings, const NKikimrConfig::TKafkaProxyConfig& config) { + +TActorId MakeKafkaDiscoveryCacheID(); + +inline NActors::IActor* CreateKafkaListener( + const NActors::TActorId& poller, const TListenerSettings& settings, const NKikimrConfig::TKafkaProxyConfig& config, + const TActorId& discoveryCacheActorId +) { return CreateSocketListener( poller, settings, [=](const TActorId& listenerActorId, TIntrusivePtr socket, TNetworkConfig::TSocketAddressType address) { - return CreateKafkaConnection(listenerActorId, socket, address, config); + return CreateKafkaConnection(listenerActorId, socket, address, config, discoveryCacheActorId); }, NKikimrServices::EServiceKikimr::KAFKA_PROXY, EErrorAction::Abort); } diff --git a/ydb/core/kafka_proxy/ut/metarequest_ut.cpp b/ydb/core/kafka_proxy/ut/metarequest_ut.cpp index b5e6d74394d3..2c41f8474d46 100644 --- a/ydb/core/kafka_proxy/ut/metarequest_ut.cpp +++ b/ydb/core/kafka_proxy/ut/metarequest_ut.cpp @@ -1,5 +1,6 @@ #include #include + #include #include @@ -29,9 +30,11 @@ Y_UNIT_TEST_SUITE(TMetadataActorTests) { auto context = std::make_shared(Config); context->ConnectionId = edgeActor; + context->DatabasePath = "/Root"; context->UserToken = new NACLib::TUserToken("root@builtin", {}); - auto actorId = runtime->Register(new TKafkaMetadataActor(context, 1, TMessagePtr(std::make_shared(), request))); + auto actorId = runtime->Register(new TKafkaMetadataActor(context, 1, TMessagePtr(std::make_shared(), request), + NKafka::MakeKafkaDiscoveryCacheID())); runtime->EnableScheduleForActor(actorId); runtime->DispatchEvents(); Cerr << "Wait for response for topics: '"; @@ -43,7 +46,9 @@ Y_UNIT_TEST_SUITE(TMetadataActorTests) { } Y_UNIT_TEST(TopicMetadataGoodAndBad) { - NPersQueue::TTestServer server; + auto serverSettings = NKikimr::NPersQueueTests::PQSettings(0).SetDomainName("Root").SetNodeCount(1); + serverSettings.AppConfig->MutableKafkaProxyConfig()->SetEnableKafkaProxy(true); + NPersQueue::TTestServer server{serverSettings}; TString topicName = "rt3.dc1--topic"; TString topicName2 = "rt3.dc1--topic2"; TString topicPath = TString("/Root/PQ/") + topicName; @@ -51,6 +56,8 @@ Y_UNIT_TEST_SUITE(TMetadataActorTests) { ui32 totalPartitions = 5; server.AnnoyingClient->CreateTopic(topicName, totalPartitions); server.AnnoyingClient->CreateTopic(topicName2, totalPartitions * 2); + server.WaitInit("topic"); + auto edgeId = server.CleverServer->GetRuntime()->AllocateEdgeActor(); auto event = GetEvent(server, edgeId, {topicPath}); diff --git a/ydb/core/kafka_proxy/ut/port_discovery_ut.cpp b/ydb/core/kafka_proxy/ut/port_discovery_ut.cpp new file mode 100644 index 000000000000..bd8fbfe0aebc --- /dev/null +++ b/ydb/core/kafka_proxy/ut/port_discovery_ut.cpp @@ -0,0 +1,305 @@ +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + + +using namespace NKikimr; + +auto UnpackDiscoveryData(const TString& data) { + Ydb::Discovery::ListEndpointsResponse leResponse; + Ydb::Discovery::ListEndpointsResult leResult; + auto ok = leResponse.ParseFromString(data); + UNIT_ASSERT(ok); + ok = leResponse.operation().result().UnpackTo(&leResult); + UNIT_ASSERT(ok); + return leResult; +} + +class TFakeDiscoveryCache: public TActorBootstrapped { + std::shared_ptr CachedMessage; + +public: + TFakeDiscoveryCache(const Ydb::Discovery::ListEndpointsResult& leResult, bool triggerError) + { + if (!triggerError) { + Ydb::Discovery::ListEndpointsResponse response; + TString out; + auto deferred = response.mutable_operation(); + deferred->set_ready(true); + deferred->set_status(Ydb::StatusIds::SUCCESS); + + auto data = deferred->mutable_result(); + data->PackFrom(leResult); + + Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out); + + TMap infoEntries; + infoEntries.insert(std::make_pair(SelfId(), TEvStateStorage::TBoardInfoEntry("/Root"))); + CachedMessage.reset(new NDiscovery::TCachedMessageData(out, "b", std::move(infoEntries))); + + } else { + CachedMessage.reset(new NDiscovery::TCachedMessageData("", "", {})); + } + } + + void Bootstrap() { + Become(&TFakeDiscoveryCache::StateWork); + } + + STATEFN(StateWork) { + Handle(ev); + } + void Handle(TAutoPtr& ev) { + Cerr << "Fake discovery cache: handle request\n"; + Send(ev->Sender, new TEvDiscovery::TEvDiscoveryData(CachedMessage), 0, ev->Cookie); + } +}; + +namespace NKafka::NTests { + Y_UNIT_TEST_SUITE(DiscoveryIsNotBroken) { + void CheckEndpointsInDiscovery(bool withSsl, bool expectKafkaEndpoints) { + auto pm = MakeSimpleShared(); + ui16 kafkaPort = pm->GetPort(); + auto serverSettings = NPersQueueTests::PQSettings(0).SetDomainName("Root").SetNodeCount(1); + serverSettings.AppConfig->MutableKafkaProxyConfig()->SetEnableKafkaProxy(true); + serverSettings.AppConfig->MutableKafkaProxyConfig()->SetListeningPort(kafkaPort); + if (withSsl) { + serverSettings.AppConfig->MutableKafkaProxyConfig()->SetSslCertificate("12345"); + } + NPersQueue::TTestServer server(serverSettings, true, {}, NActors::NLog::PRI_INFO, pm); + auto port = server.GrpcPort; + Cerr << "Run with port = " << port << ", kafka port = " << kafkaPort << Endl; + auto* runtime = server.GetRuntime(); + auto edge = runtime->AllocateEdgeActor(); + + TActorId discoveryCacheActorID; + if (expectKafkaEndpoints) { + discoveryCacheActorID = runtime->Register(CreateDiscoveryCache(NGRpcService::KafkaEndpointId)); + } else { + discoveryCacheActorID = runtime->Register(CreateDiscoveryCache()); + } + auto discoverer = runtime->Register(CreateDiscoverer(&MakeEndpointsBoardPath, "/Root", edge, discoveryCacheActorID)); + Y_UNUSED(discoverer); + TAutoPtr handle; + auto* ev = runtime->GrabEdgeEvent(handle); + UNIT_ASSERT(ev); + auto discoveryData = UnpackDiscoveryData(ev->CachedMessageData->CachedMessage); + auto discoverySslData = UnpackDiscoveryData(ev->CachedMessageData->CachedMessageSsl); + + auto checkEnpoints = [&] (ui32 port, ui32 sslPort) { + if (port) { + UNIT_ASSERT_VALUES_EQUAL(discoveryData.endpoints_size(), 1); + UNIT_ASSERT_VALUES_EQUAL(discoveryData.endpoints(0).port(), port); + UNIT_ASSERT_VALUES_EQUAL(discoverySslData.endpoints_size(), 0); + } + if (sslPort) { + UNIT_ASSERT_VALUES_EQUAL(discoverySslData.endpoints_size(), 1); + UNIT_ASSERT_VALUES_EQUAL(discoverySslData.endpoints(0).port(), sslPort); + UNIT_ASSERT_VALUES_EQUAL(discoveryData.endpoints_size(), 0); + } + }; + if (expectKafkaEndpoints) { + if (withSsl) { + checkEnpoints(0, kafkaPort); + } else { + checkEnpoints(kafkaPort, 0); + } + } else { + checkEnpoints(port, 0); + } + } + + Y_UNIT_TEST(NoKafkaEndpointInDiscovery) { + CheckEndpointsInDiscovery(false, false); + } + + Y_UNIT_TEST(NoKafkaSslEndpointInDiscovery) { + CheckEndpointsInDiscovery(true, false); + } + + Y_UNIT_TEST(HaveKafkaEndpointInDiscovery) { + CheckEndpointsInDiscovery(false, true); + } + Y_UNIT_TEST(HaveKafkaSslEndpointInDiscovery) { + CheckEndpointsInDiscovery(true, true); + } + } + + Y_UNIT_TEST_SUITE(PublishKafkaEndpoints) { + Y_UNIT_TEST(HaveEndpointInLookup) { + auto pm = MakeSimpleShared(); + ui16 kafkaPort = pm->GetPort(); + auto serverSettings = NPersQueueTests::PQSettings(0).SetDomainName("Root").SetNodeCount(1); + serverSettings.AppConfig->MutableKafkaProxyConfig()->SetEnableKafkaProxy(true); + serverSettings.AppConfig->MutableKafkaProxyConfig()->SetListeningPort(kafkaPort); + NPersQueue::TTestServer server(serverSettings, true, {}, NActors::NLog::PRI_INFO, pm); + + auto* runtime = server.GetRuntime(); + auto edge = runtime->AllocateEdgeActor(); + runtime->Register(CreateBoardLookupActor(MakeEndpointsBoardPath("/Root"), edge, EBoardLookupMode::Second)); + TAutoPtr handle; + auto* ev = runtime->GrabEdgeEvent(handle); + UNIT_ASSERT(ev); + Cerr << "ev for path: " << ev->Path << ", is unknown: " << (ev->Status == TEvStateStorage::TEvBoardInfo::EStatus::Unknown) + << ", is unavalable: " << (ev->Status == TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable) << Endl; + UNIT_ASSERT(ev->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok); + UNIT_ASSERT_VALUES_EQUAL(ev->InfoEntries.size(), 2); + bool hasKafkaPort = false; + for (const auto& [k, v] : ev->InfoEntries) { + NKikimrStateStorage::TEndpointBoardEntry entry; + UNIT_ASSERT(entry.ParseFromString(v.Payload)); + Cerr << "Got entry, actor: " << k.ToString() << ", entry: " << entry.DebugString() << Endl; + if (entry.GetPort() == kafkaPort) { + UNIT_ASSERT_STRINGS_EQUAL(entry.GetEndpointId(), NGRpcService::KafkaEndpointId); + hasKafkaPort = true; + } + } + UNIT_ASSERT(hasKafkaPort); + } + struct TMetarequestTestParams { + NPersQueue::TTestServer Server; + ui64 KafkaPort; + NKikimrConfig::TKafkaProxyConfig KafkaConfig; + TString FullTopicName; + }; + + TMetarequestTestParams SetupServer(const TString shortTopicName) { + TStringBuilder fullTopicName; + fullTopicName << "rt3.dc1--" << shortTopicName; + auto pm = MakeSimpleShared(); + ui16 kafkaPort = pm->GetPort(); + auto serverSettings = NPersQueueTests::PQSettings(0).SetDomainName("Root").SetNodeCount(1); + serverSettings.AppConfig->MutableKafkaProxyConfig()->SetEnableKafkaProxy(true); + + serverSettings.AppConfig->MutableKafkaProxyConfig()->SetListeningPort(kafkaPort); + NPersQueue::TTestServer server(serverSettings, true, {}, NActors::NLog::PRI_INFO, pm); + + server.AnnoyingClient->CreateTopic(fullTopicName, 1); + server.WaitInit(shortTopicName); + + return {std::move(server), kafkaPort, serverSettings.AppConfig->GetKafkaProxyConfig(), fullTopicName}; + } + + void CreateMetarequestActor( + const TActorId& edge, const TString& topicPath, auto* runtime, const auto& kafkaConfig, const TActorId& fakeCacheId = {} + ) { + TMetadataRequestData::TPtr metaRequest = std::make_shared(); + metaRequest->Topics.emplace_back(); + auto& topic = metaRequest->Topics[0]; + topic.Name = topicPath; + + auto context = std::make_shared(kafkaConfig); + context->ConnectionId = edge; + context->DatabasePath = "/Root"; + context->UserToken = new NACLib::TUserToken("root@builtin", {}); + + TActorId actorId; + if (fakeCacheId) { + actorId = runtime->Register(new NKafka::TKafkaMetadataActor( + context, 1, TMessagePtr(std::make_shared(), metaRequest), fakeCacheId + )); + } else { + actorId = runtime->Register(new NKafka::TKafkaMetadataActor( + context, 1, TMessagePtr(std::make_shared(), metaRequest), + NKafka::MakeKafkaDiscoveryCacheID() + )); + } + runtime->EnableScheduleForActor(actorId); + } + + void CheckKafkaMetaResponse(TTestActorRuntime* runtime, ui64 kafkaPort, bool error = false) { + TAutoPtr handle; + auto* ev = runtime->GrabEdgeEvent(handle); + UNIT_ASSERT(ev); + auto response = dynamic_cast(ev->Response.get()); + UNIT_ASSERT_VALUES_EQUAL(response->Topics.size(), 1); + if (!error) { + UNIT_ASSERT(response->Topics[0].ErrorCode == EKafkaErrors::NONE_ERROR); + } else { + UNIT_ASSERT(response->Topics[0].ErrorCode == EKafkaErrors::LISTENER_NOT_FOUND); + UNIT_ASSERT(ev->ErrorCode == EKafkaErrors::LISTENER_NOT_FOUND); + return; + } + UNIT_ASSERT_VALUES_EQUAL(response->Brokers.size(), 1); + Cerr << "Broker " << response->Brokers[0].NodeId << " - " << response->Brokers[0].Host << ":" << response->Brokers[0].Port << Endl; + UNIT_ASSERT_VALUES_EQUAL(response->Brokers[0].Port, kafkaPort); + } + + Y_UNIT_TEST(MetadataActorGetsEndpoint) { + auto [server, kafkaPort, config, topicName] = SetupServer("topic1"); + + auto* runtime = server.GetRuntime(); + auto edge = runtime->AllocateEdgeActor(); + + CreateMetarequestActor(edge, NKikimr::JoinPath({"/Root/PQ/", topicName}), runtime, + config); + + CheckKafkaMetaResponse(runtime, kafkaPort); + } + + Y_UNIT_TEST(DiscoveryResponsesWithNoNode) { + auto [server, kafkaPort, config, topicName] = SetupServer("topic1"); + + auto* runtime = server.GetRuntime(); + auto edge = runtime->AllocateEdgeActor(); + + Ydb::Discovery::ListEndpointsResult leResult; + auto* ep = leResult.add_endpoints(); + ep->set_address("wrong.host"); + ep->set_port(1); + ep->set_node_id(9999); + ep = leResult.add_endpoints(); + ep->set_address("wrong.host2"); + ep->set_port(2); + ep->set_node_id(9998); + auto fakeCache = runtime->Register(new TFakeDiscoveryCache(leResult, false)); + runtime->EnableScheduleForActor(fakeCache); + CreateMetarequestActor(edge, NKikimr::JoinPath({"/Root/PQ/", topicName}), runtime, + config, fakeCache); + + CheckKafkaMetaResponse(runtime, kafkaPort); + } + + Y_UNIT_TEST(DiscoveryResponsesWithError) { + auto [server, kafkaPort, config, topicName] = SetupServer("topic1"); + + auto* runtime = server.GetRuntime(); + auto edge = runtime->AllocateEdgeActor(); + + Ydb::Discovery::ListEndpointsResult leResult; + auto fakeCache = runtime->Register(new TFakeDiscoveryCache(leResult, true)); + runtime->EnableScheduleForActor(fakeCache); + CreateMetarequestActor(edge, NKikimr::JoinPath({"/Root/PQ/", topicName}), runtime, + config, fakeCache); + + CheckKafkaMetaResponse(runtime, kafkaPort, true); + } + + Y_UNIT_TEST(DiscoveryResponsesWithOtherPort) { + auto [server, kafkaPort, config, topicName] = SetupServer("topic1"); + + auto* runtime = server.GetRuntime(); + auto edge = runtime->AllocateEdgeActor(); + + Ydb::Discovery::ListEndpointsResult leResult; + auto* ep = leResult.add_endpoints(); + ep->set_address("localhost"); + ep->set_port(12345); + ep->set_node_id(runtime->GetNodeId(0)); + auto fakeCache = runtime->Register(new TFakeDiscoveryCache(leResult, false)); + runtime->EnableScheduleForActor(fakeCache); + CreateMetarequestActor(edge, NKikimr::JoinPath({"/Root/PQ/", topicName}), runtime, + config, fakeCache); + + CheckKafkaMetaResponse(runtime, 12345); + } + } +} \ No newline at end of file diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp index 86055c01b656..9d9f12c78b80 100644 --- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -121,6 +121,7 @@ class TTestServer { appConfig.MutablePQConfig()->AddValidWriteSpeedLimitsKbPerSec(512); appConfig.MutablePQConfig()->AddValidWriteSpeedLimitsKbPerSec(1_KB); + appConfig.MutableGRpcConfig()->SetHost("::1"); auto limit = appConfig.MutablePQConfig()->AddValidRetentionLimits(); limit->SetMinPeriodSeconds(0); limit->SetMaxPeriodSeconds(TDuration::Days(1).Seconds()); @@ -835,10 +836,10 @@ class TTestClient { }; Y_UNIT_TEST_SUITE(KafkaProtocol) { - // this test imitates kafka producer behaviour: - // 1. get api version, - // 2. authenticate via sasl, - // 3. acquire producer id, + // this test imitates kafka producer behaviour: + // 1. get api version, + // 2. authenticate via sasl, + // 3. acquire producer id, // 4. produce to topic several messages, read them and assert correct contents and metadata Y_UNIT_TEST(ProduceScenario) { TInsecureTestServer testServer("2"); @@ -1234,7 +1235,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { UNIT_ASSERT_VALUES_EQUAL(dataStr, value); } } - + // create table and init cdc for it { NYdb::NTable::TTableClient tableClient(*testServer.Driver); diff --git a/ydb/core/kafka_proxy/ut/ya.make b/ydb/core/kafka_proxy/ut/ya.make index 361a5117a612..0ad4df04ab5f 100644 --- a/ydb/core/kafka_proxy/ut/ya.make +++ b/ydb/core/kafka_proxy/ut/ya.make @@ -1,12 +1,12 @@ UNITTEST_FOR(ydb/core/kafka_proxy) SIZE(medium) - SRCS( ut_kafka_functions.cpp ut_protocol.cpp ut_serialization.cpp metarequest_ut.cpp + port_discovery_ut.cpp ) PEERDIR( diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index a4880134e383..cf9e200dbc30 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -62,6 +62,7 @@ #include #include #include +#include #include #include #include @@ -1119,21 +1120,45 @@ namespace Tests { TActorId actorId = Runtime->Register(actor, nodeIdx, Runtime->GetAppData(nodeIdx).SystemPoolId); Runtime->RegisterService(MakePollerActorId(), actorId, nodeIdx); } - if (Settings->AppConfig->GetKafkaProxyConfig().GetEnableKafkaProxy()) { + + IActor* discoveryCache = CreateDiscoveryCache(NGRpcService::KafkaEndpointId); + TActorId discoveryCacheId = Runtime->Register(discoveryCache, nodeIdx, userPoolId); + Runtime->RegisterService(NKafka::MakeKafkaDiscoveryCacheID(), discoveryCacheId, nodeIdx); + NKafka::TListenerSettings settings; settings.Port = Settings->AppConfig->GetKafkaProxyConfig().GetListeningPort(); + bool ssl = false; if (Settings->AppConfig->GetKafkaProxyConfig().HasSslCertificate()) { + ssl = true; settings.SslCertificatePem = Settings->AppConfig->GetKafkaProxyConfig().GetSslCertificate(); } - IActor* actor = NKafka::CreateKafkaListener(MakePollerActorId(), settings, Settings->AppConfig->GetKafkaProxyConfig()); + IActor* actor = NKafka::CreateKafkaListener(MakePollerActorId(), settings, Settings->AppConfig->GetKafkaProxyConfig(), + discoveryCacheId); TActorId actorId = Runtime->Register(actor, nodeIdx, userPoolId); Runtime->RegisterService(TActorId{}, actorId, nodeIdx); IActor* metricsActor = CreateKafkaMetricsActor(NKafka::TKafkaMetricsSettings{Runtime->GetAppData().Counters->GetSubgroup("counters", "kafka_proxy")}); TActorId metricsActorId = Runtime->Register(metricsActor, nodeIdx, userPoolId); Runtime->RegisterService(NKafka::MakeKafkaMetricsServiceID(), metricsActorId, nodeIdx); + + { + auto& appData = Runtime->GetAppData(0); + + TIntrusivePtr desc = new NGRpcService::TGrpcEndpointDescription(); + desc->Address = Settings->AppConfig->GetGRpcConfig().GetHost(); + desc->Port = settings.Port; + desc->Ssl = ssl; + desc->EndpointId = NGRpcService::KafkaEndpointId; + + TVector rootDomains; + if (const auto& domain = appData.DomainsInfo->Domain) { + rootDomains.emplace_back("/" + domain->Name); + } + desc->ServedDatabases.insert(desc->ServedDatabases.end(), rootDomains.begin(), rootDomains.end()); + Runtime->GetActorSystem(0)->Register(NGRpcService::CreateGrpcEndpointPublishActor(desc.Get()), TMailboxType::ReadAsFilled, appData.UserPoolId); + } } if (Settings->EnableYq) { diff --git a/ydb/services/persqueue_v1/actors/events.h b/ydb/services/persqueue_v1/actors/events.h index 8110744f3ac9..362280204afc 100644 --- a/ydb/services/persqueue_v1/actors/events.h +++ b/ydb/services/persqueue_v1/actors/events.h @@ -506,7 +506,7 @@ struct TEvPQProxy { ui64 PartitionId; ui64 Generation; ui64 NodeId; - TString Hostname; + //TString Hostname; }; struct TEvPartitionLocationResponse : public NActors::TEventLocal diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index 7d3f3b75235b..9fc6789921df 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -1341,13 +1341,10 @@ void TPartitionsLocationActor::Bootstrap(const NActors::TActorContext&) { SendDescribeProposeRequest(); UnsafeBecome(&TPartitionsLocationActor::StateWork); - SendNodesRequest(); - } void TPartitionsLocationActor::StateWork(TAutoPtr& ev) { switch (ev->GetTypeRewrite()) { - hFunc(TEvICNodesInfoCache::TEvGetAllNodesInfoResponse, Handle); default: if (!TDescribeTopicActorImpl::StateWork(ev, ActorContext())) { TBase::StateWork(ev); @@ -1382,44 +1379,16 @@ bool TPartitionsLocationActor::ApplyResponse( partLocation.NodeId = nodeId; Response->Partitions.emplace_back(std::move(partLocation)); } - if (GotNodesInfo) - Finalize(); - else - GotPartitions = true; + Finalize(); return true; } -void TPartitionsLocationActor::SendNodesRequest() const { - auto* icEv = new TEvICNodesInfoCache::TEvGetAllNodesInfoRequest(); - ActorContext().Send(CreateICNodesInfoCacheServiceId(), icEv); - -} - -void TPartitionsLocationActor::Handle(TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev) { - NodesInfoEv = ev; - if (GotPartitions) - Finalize(); - else - GotNodesInfo = true; -} - void TPartitionsLocationActor::Finalize() { if (Settings.Partitions) { Y_ABORT_UNLESS(Response->Partitions.size() == Settings.Partitions.size()); } else { Y_ABORT_UNLESS(Response->Partitions.size() == PQGroupInfo->Description.PartitionsSize()); } - for (auto& pInResponse : Response->Partitions) { - auto iter = NodesInfoEv->Get()->NodeIdsMapping->find(pInResponse.NodeId); - if (iter.IsEnd()) { - return RaiseError( - TStringBuilder() << "Hostname not found for nodeId " << pInResponse.NodeId, - Ydb::PersQueue::ErrorCode::ERROR, - Ydb::StatusIds::INTERNAL_ERROR, ActorContext() - ); - } - pInResponse.Hostname = (*NodesInfoEv->Get()->Nodes)[iter->second].Host; - } TBase::RespondWithCode(Ydb::StatusIds::SUCCESS); } diff --git a/ydb/services/persqueue_v1/actors/schema_actors.h b/ydb/services/persqueue_v1/actors/schema_actors.h index a5a78290cef1..46b066858718 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.h +++ b/ydb/services/persqueue_v1/actors/schema_actors.h @@ -449,16 +449,9 @@ using TBase = TPQInternalSchemaActor PartitionIds; - - bool GotPartitions = false; - bool GotNodesInfo = false; }; } // namespace NKikimr::NGRpcProxy::V1 diff --git a/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp b/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp index a05ab225a3f0..96d71039dcd6 100644 --- a/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp +++ b/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp @@ -210,7 +210,6 @@ Y_UNIT_TEST_SUITE(TTopicApiDescribes) { THashSet allParts; for (const auto& p : ev->Partitions) { - UNIT_ASSERT(!p.Hostname.empty()); UNIT_ASSERT(p.NodeId > 0); // UNIT_ASSERT(p.IncGeneration > 0); UNIT_ASSERT(p.PartitionId < 15);