From 48632857489d6b10ac17b18b478cea34ffe8ae7b Mon Sep 17 00:00:00 2001 From: Konstantin Melekhov Date: Fri, 10 Jan 2025 13:08:27 +0000 Subject: [PATCH] Avoid request from IC in location actor --- .../actors/kafka_metadata_actor.cpp | 186 ++++++++++-------- .../kafka_proxy/actors/kafka_metadata_actor.h | 31 +-- ydb/core/kafka_proxy/ut/port_discovery_ut.cpp | 25 ++- ydb/services/persqueue_v1/actors/events.h | 2 +- .../persqueue_v1/actors/schema_actors.cpp | 23 +-- .../persqueue_v1/actors/schema_actors.h | 7 - 6 files changed, 151 insertions(+), 123 deletions(-) diff --git a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp index 64aea7602ab4..d565aa6d15e2 100644 --- a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp @@ -23,11 +23,14 @@ void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) { if (WithProxy) { AddProxyNodeToBrokers(); } else { - SendDiscoveryRequest(); - } + if (Context->Config.GetEnableEndpointDiscovery()) + SendDiscoveryRequest(); + else + RequestICNodeCache(); - if (Message->Topics.size() == 0 && !WithProxy) { - AddCurrentNodeToBrokers(); + if (Message->Topics.size() == 0) { + NeedCurrentNode = true; + } } if (Message->Topics.size() != 0) { @@ -38,17 +41,7 @@ void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) { RespondIfRequired(ctx); } -void TKafkaMetadataActor::AddCurrentNodeToBrokers() { - NeedCurrentNode = true; - if (!DiscoveryRequested) { - RequestICNodeCache(); - } -} - void TKafkaMetadataActor::SendDiscoveryRequest() { - if (!Context->Config.GetEnableEndpointDiscovery()) - return; - DiscoveryRequested = true; if (!DiscoveryCacheActor) { OwnDiscoveryCache = true; DiscoveryCacheActor = Register(CreateDiscoveryCache(NGRpcService::KafkaEndpointId)); @@ -60,40 +53,17 @@ void TKafkaMetadataActor::SendDiscoveryRequest() { void TKafkaMetadataActor::HandleDiscoveryError(TEvDiscovery::TEvError::TPtr& ev) { PendingResponses--; - if (NeedCurrentNode) { - RequestICNodeCache(); - } - DiscoveryRequested = false; - for (auto& [index, ev] : PendingTopicResponses) { - auto& topic = Response->Topics[index]; - AddTopicResponse(topic, ev->Get()); - } - PendingTopicResponses.clear(); + HaveError = true; RespondIfRequired(ActorContext()); } void TKafkaMetadataActor::HandleDiscoveryData(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const NActors::TActorContext& ctx) { PendingResponses--; ProcessDiscoveryData(ev); - if (NeedCurrentNode) { - auto currNodeIter = Nodes.find(SelfId().NodeId()); - if (currNodeIter == Nodes.end()) { - RequestICNodeCache(); - } else { - Y_ABORT_UNLESS(!CurrentNodeHostname.empty()); - AddBroker(ctx.SelfID.NodeId(), CurrentNodeHostname, currNodeIter->second); - } - } - DiscoveryRequested = false; - for (auto& [index, ev] : PendingTopicResponses) { - auto& topic = Response->Topics[index]; - AddTopicResponse(topic, ev->Get()); - } - PendingTopicResponses.clear(); RespondIfRequired(ActorContext()); } -bool TKafkaMetadataActor::ProcessDiscoveryData(TEvDiscovery::TEvDiscoveryData::TPtr& ev) { +void TKafkaMetadataActor::ProcessDiscoveryData(TEvDiscovery::TEvDiscoveryData::TPtr& ev) { bool expectSsl = Context->Config.HasSslCertificate(); Ydb::Discovery::ListEndpointsResponse leResponse; @@ -108,23 +78,35 @@ bool TKafkaMetadataActor::ProcessDiscoveryData(TEvDiscovery::TEvDiscoveryData::T if (ok) { ok = leResponse.operation().result().UnpackTo(&leResult); } - if (!ok) - return false; + if (!ok) { + HaveError = true; + return; + } for (auto& endpoint : leResult.endpoints()) { - Nodes.insert({endpoint.node_id(), endpoint.port()}); - if (NeedCurrentNode && endpoint.node_id() == SelfId().NodeId()) { - CurrentNodeHostname = endpoint.address(); - } + Nodes.insert({endpoint.node_id(), {endpoint.address(), endpoint.port()}}); } - return true; } void TKafkaMetadataActor::RequestICNodeCache() { + Y_ABORT_UNLESS(!FallbackToIcDiscovery); + FallbackToIcDiscovery = true; PendingResponses++; Send(NKikimr::NIcNodeCache::CreateICNodesInfoCacheServiceId(), new NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoRequest()); } +void TKafkaMetadataActor::HandleNodesResponse( + NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev, + const NActors::TActorContext& ctx +) { + Y_ABORT_UNLESS(FallbackToIcDiscovery); + for (const auto& [nodeId, index] : *ev->Get()->NodeIdsMapping) { + Nodes[nodeId] = {(*ev->Get()->Nodes)[index].Host, (ui32)Context->Config.GetListeningPort()}; + } + --PendingResponses; + RespondIfRequired(ctx); +} + void TKafkaMetadataActor::AddProxyNodeToBrokers() { AddBroker(ProxyNodeId, Context->Config.GetProxy().GetHostname(), Context->Config.GetProxy().GetPort()); } @@ -153,25 +135,6 @@ void TKafkaMetadataActor::ProcessTopics() { } } -void TKafkaMetadataActor::HandleNodesResponse(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev, const NActors::TActorContext& ctx) { - auto iter = ev->Get()->NodeIdsMapping->find(ctx.SelfID.NodeId()); - Y_ABORT_UNLESS(!iter.IsEnd()); - auto host = (*ev->Get()->Nodes)[iter->second].Host; - KAFKA_LOG_D("Incoming TEvGetAllNodesInfoResponse. Host#: " << host); - AddBroker(ctx.SelfID.NodeId(), host, Context->Config.GetListeningPort()); - - --PendingResponses; - RespondIfRequired(ctx); -} - -void TKafkaMetadataActor::AddBroker(ui64 nodeId, const TString& host, ui64 port) { - auto broker = TMetadataResponseData::TMetadataResponseBroker{}; - broker.NodeId = nodeId; - broker.Host = host; - broker.Port = port; - Response->Brokers.emplace_back(std::move(broker)); -} - TActorId TKafkaMetadataActor::SendTopicRequest(const TMetadataRequestData::TMetadataRequestTopic& topicRequest) { KAFKA_LOG_D("Describe partitions locations for topic '" << *topicRequest.Name << "' for user '" << Context->UserToken->GetUserSID() << "'"); @@ -185,6 +148,18 @@ TActorId TKafkaMetadataActor::SendTopicRequest(const TMetadataRequestData::TMeta return Register(new TPartitionsLocationActor(locationRequest, SelfId())); } +TVector TKafkaMetadataActor::CheckTopicNodes(TEvLocationResponse* response) { + TVector partitionNodes; + for (const auto& part : response->Partitions) { + auto iter = Nodes.find(part.NodeId); + if (iter.IsEnd()) { + return {}; + } + partitionNodes.push_back(&iter->second); + } + return partitionNodes; +} + void TKafkaMetadataActor::AddTopicError( TMetadataResponseData::TMetadataResponseTopic& topic, EKafkaErrors errorCode ) { @@ -192,10 +167,15 @@ void TKafkaMetadataActor::AddTopicError( ErrorCode = errorCode; } -void TKafkaMetadataActor::AddTopicResponse(TMetadataResponseData::TMetadataResponseTopic& topic, TEvLocationResponse* response) { +void TKafkaMetadataActor::AddTopicResponse( + TMetadataResponseData::TMetadataResponseTopic& topic, + TEvLocationResponse* response, + const TVector& partitionNodes +) { topic.ErrorCode = NONE_ERROR; topic.Partitions.reserve(response->Partitions.size()); + auto nodeIter = partitionNodes.begin(); for (const auto& part : response->Partitions) { auto nodeId = WithProxy ? ProxyNodeId : part.NodeId; @@ -212,22 +192,18 @@ void TKafkaMetadataActor::AddTopicResponse(TMetadataResponseData::TMetadataRespo if (!WithProxy) { auto ins = AllClusterNodes.insert(part.NodeId); if (ins.second) { - auto hostname = part.Hostname; + auto hostname = (*nodeIter)->Host; if (hostname.StartsWith(UnderlayPrefix)) { hostname = hostname.substr(sizeof(UnderlayPrefix) - 1); } - ui64 port = Context->Config.GetListeningPort(); - auto nodeIter = Nodes.find(part.NodeId); - if (!nodeIter.IsEnd()) - port = nodeIter->second; - - AddBroker(part.NodeId, hostname, port); + AddBroker(part.NodeId, hostname, (*nodeIter)->Port); } } + ++nodeIter; } } -void TKafkaMetadataActor::HandleResponse(TEvLocationResponse::TPtr ev, const TActorContext& ctx) { +void TKafkaMetadataActor::HandleLocationResponse(TEvLocationResponse::TPtr ev, const TActorContext& ctx) { --PendingResponses; auto* r = ev->Get(); @@ -243,7 +219,6 @@ void TKafkaMetadataActor::HandleResponse(TEvLocationResponse::TPtr ev, const TAc if (actorIter->second.empty()) { KAFKA_LOG_CRIT("Corrupted state (empty actorId in mapping). Ignored location response, expect incomplete reply"); - return RespondIfRequired(ctx); } @@ -251,24 +226,71 @@ void TKafkaMetadataActor::HandleResponse(TEvLocationResponse::TPtr ev, const TAc auto& topic = Response->Topics[index]; if (r->Status == Ydb::StatusIds::SUCCESS) { KAFKA_LOG_D("Describe topic '" << topic.Name << "' location finishied successful"); - if (DiscoveryRequested) { - PendingTopicResponses.insert(std::make_pair(index, ev)); - } else { - AddTopicResponse(topic, r); - } + PendingTopicResponses.insert(std::make_pair(index, ev->Release())); } else { KAFKA_LOG_ERROR("Describe topic '" << topic.Name << "' location finishied with error: Code=" << r->Status << ", Issues=" << r->Issues.ToOneLineString()); AddTopicError(topic, ConvertErrorCode(r->Status)); } } - RespondIfRequired(ActorContext()); + RespondIfRequired(ctx); +} + +void TKafkaMetadataActor::AddBroker(ui64 nodeId, const TString& host, ui64 port) { + auto broker = TMetadataResponseData::TMetadataResponseBroker{}; + broker.NodeId = nodeId; + broker.Host = host; + broker.Port = port; + Response->Brokers.emplace_back(std::move(broker)); } void TKafkaMetadataActor::RespondIfRequired(const TActorContext& ctx) { - if (PendingResponses == 0 && !PendingTopicResponses) { + auto Respond = [&] { Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, Response, ErrorCode)); Die(ctx); + }; + + if (HaveError) { + ErrorCode = EKafkaErrors::UNKNOWN_SERVER_ERROR; + for (auto& topic : Response->Topics) { + AddTopicError(topic, ErrorCode); + } + Respond(); + return; } + if (PendingResponses != 0) { + return; + } + + if (NeedCurrentNode) { + auto nodeIter = Nodes.find(SelfId().NodeId()); + if (nodeIter.IsEnd()) { + // Node info was not found, request from IC nodes cache instead + Y_ABORT_UNLESS(!FallbackToIcDiscovery); + RequestICNodeCache(); + return; + } + AddBroker(nodeIter->first, nodeIter->second.Host, nodeIter->second.Port); + } + while (!PendingTopicResponses.empty()) { + auto& [index, ev] = *PendingTopicResponses.begin(); + auto& topic = Response->Topics[index]; + auto topicNodes = CheckTopicNodes(ev.Get()); + if (topicNodes.empty()) { + if (!FallbackToIcDiscovery) { + // Node info wasn't found via discovery, fallback to interconnect + RequestICNodeCache(); + return; + } else { + // Already tried both YDB discovery and interconnect, still couldn't find the node for partition. Throw error + AddTopicError(topic, EKafkaErrors::UNKNOWN_SERVER_ERROR); + } + } else { + AddTopicResponse(topic, ev.Get(), topicNodes); + } + PendingTopicResponses.erase(PendingTopicResponses.begin()); + } + + Respond(); } void TKafkaMetadataActor::Die(const TActorContext& ctx) { diff --git a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h index 7d21f2e87500..8a6e7902f02a 100644 --- a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h @@ -26,25 +26,33 @@ class TKafkaMetadataActor: public NActors::TActorBootstrapped& nodes); void AddTopicError(TMetadataResponseData::TMetadataResponseTopic& topic, EKafkaErrors errorCode); void RespondIfRequired(const NActors::TActorContext& ctx); void AddProxyNodeToBrokers(); - void AddCurrentNodeToBrokers(); void AddBroker(ui64 nodeId, const TString& host, ui64 port); void RequestICNodeCache(); void ProcessTopics(); void SendDiscoveryRequest(); - bool ProcessDiscoveryData(NKikimr::TEvDiscovery::TEvDiscoveryData::TPtr& ev); + void ProcessDiscoveryData(NKikimr::TEvDiscovery::TEvDiscoveryData::TPtr& ev); + TVector CheckTopicNodes(TEvLocationResponse* response); + STATEFN(StateWork) { switch (ev->GetTypeRewrite()) { - HFunc(TEvLocationResponse, HandleResponse); + HFunc(TEvLocationResponse, HandleLocationResponse); HFunc(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse, HandleNodesResponse); HFunc(NKikimr::TEvDiscovery::TEvDiscoveryData, HandleDiscoveryData); hFunc(NKikimr::TEvDiscovery::TEvError, HandleDiscoveryError); @@ -68,12 +76,13 @@ class TKafkaMetadataActor: public NActors::TActorBootstrapped Nodes; - TMap PendingTopicResponses; + bool NeedCurrentNode = false; + bool HaveError = false; + bool FallbackToIcDiscovery = false; + TMap> PendingTopicResponses; + + THashMap Nodes; }; } // namespace NKafka diff --git a/ydb/core/kafka_proxy/ut/port_discovery_ut.cpp b/ydb/core/kafka_proxy/ut/port_discovery_ut.cpp index 0b0a5ad104c8..909486a78e73 100644 --- a/ydb/core/kafka_proxy/ut/port_discovery_ut.cpp +++ b/ydb/core/kafka_proxy/ut/port_discovery_ut.cpp @@ -216,13 +216,19 @@ namespace NKafka::NTests { runtime->EnableScheduleForActor(actorId); } - void CheckKafkaMetaResponse(TTestActorRuntime* runtime, ui64 kafkaPort) { + void CheckKafkaMetaResponse(TTestActorRuntime* runtime, ui64 kafkaPort, bool error = false) { TAutoPtr handle; auto* ev = runtime->GrabEdgeEvent(handle); UNIT_ASSERT(ev); auto response = dynamic_cast(ev->Response.get()); UNIT_ASSERT_VALUES_EQUAL(response->Topics.size(), 1); - UNIT_ASSERT(response->Topics[0].ErrorCode == EKafkaErrors::NONE_ERROR); + if (!error) { + UNIT_ASSERT(response->Topics[0].ErrorCode == EKafkaErrors::NONE_ERROR); + } else { + UNIT_ASSERT(response->Topics[0].ErrorCode == EKafkaErrors::UNKNOWN_SERVER_ERROR); + UNIT_ASSERT(ev->ErrorCode == EKafkaErrors::UNKNOWN_SERVER_ERROR); + return; + } UNIT_ASSERT_VALUES_EQUAL(response->Brokers.size(), 1); Cerr << "Broker " << response->Brokers[0].NodeId << " - " << response->Brokers[0].Host << ":" << response->Brokers[0].Port << Endl; UNIT_ASSERT_VALUES_EQUAL(response->Brokers[0].Port, kafkaPort); @@ -263,6 +269,21 @@ namespace NKafka::NTests { CheckKafkaMetaResponse(runtime, kafkaPort); } + Y_UNIT_TEST(DiscoveryResponsesWithError) { + auto [server, kafkaPort, config, topicName] = SetupServer("topic1"); + + auto* runtime = server.GetRuntime(); + auto edge = runtime->AllocateEdgeActor(); + + Ydb::Discovery::ListEndpointsResult leResult; + auto fakeCache = runtime->Register(new TFakeDiscoveryCache(leResult, true)); + runtime->EnableScheduleForActor(fakeCache); + CreateMetarequestActor(edge, NKikimr::JoinPath({"/Root/PQ/", topicName}), runtime, + config, fakeCache); + + CheckKafkaMetaResponse(runtime, kafkaPort, true); + } + Y_UNIT_TEST(DiscoveryResponsesWithOtherPort) { auto [server, kafkaPort, config, topicName] = SetupServer("topic1"); diff --git a/ydb/services/persqueue_v1/actors/events.h b/ydb/services/persqueue_v1/actors/events.h index 8110744f3ac9..362280204afc 100644 --- a/ydb/services/persqueue_v1/actors/events.h +++ b/ydb/services/persqueue_v1/actors/events.h @@ -506,7 +506,7 @@ struct TEvPQProxy { ui64 PartitionId; ui64 Generation; ui64 NodeId; - TString Hostname; + //TString Hostname; }; struct TEvPartitionLocationResponse : public NActors::TEventLocal diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index f1bbf1644979..cc629db40dad 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -1478,13 +1478,10 @@ void TPartitionsLocationActor::Bootstrap(const NActors::TActorContext&) { SendDescribeProposeRequest(); UnsafeBecome(&TPartitionsLocationActor::StateWork); - SendNodesRequest(); - } void TPartitionsLocationActor::StateWork(TAutoPtr& ev) { switch (ev->GetTypeRewrite()) { - hFunc(TEvICNodesInfoCache::TEvGetAllNodesInfoResponse, Handle); default: if (!TDescribeTopicActorImpl::StateWork(ev, ActorContext())) { TBase::StateWork(ev); @@ -1519,13 +1516,10 @@ bool TPartitionsLocationActor::ApplyResponse( partLocation.NodeId = nodeId; Response->Partitions.emplace_back(std::move(partLocation)); } - if (GotNodesInfo) - Finalize(); - else - GotPartitions = true; + Finalize(); return true; } - +/* void TPartitionsLocationActor::SendNodesRequest() const { auto* icEv = new TEvICNodesInfoCache::TEvGetAllNodesInfoRequest(); ActorContext().Send(CreateICNodesInfoCacheServiceId(), icEv); @@ -1539,24 +1533,13 @@ void TPartitionsLocationActor::Handle(TEvICNodesInfoCache::TEvGetAllNodesInfoRes else GotNodesInfo = true; } - +*/ void TPartitionsLocationActor::Finalize() { if (Settings.Partitions) { Y_ABORT_UNLESS(Response->Partitions.size() == Settings.Partitions.size()); } else { Y_ABORT_UNLESS(Response->Partitions.size() == PQGroupInfo->Description.PartitionsSize()); } - for (auto& pInResponse : Response->Partitions) { - auto iter = NodesInfoEv->Get()->NodeIdsMapping->find(pInResponse.NodeId); - if (iter.IsEnd()) { - return RaiseError( - TStringBuilder() << "Hostname not found for nodeId " << pInResponse.NodeId, - Ydb::PersQueue::ErrorCode::ERROR, - Ydb::StatusIds::INTERNAL_ERROR, ActorContext() - ); - } - pInResponse.Hostname = (*NodesInfoEv->Get()->Nodes)[iter->second].Host; - } TBase::RespondWithCode(Ydb::StatusIds::SUCCESS); } diff --git a/ydb/services/persqueue_v1/actors/schema_actors.h b/ydb/services/persqueue_v1/actors/schema_actors.h index a5a78290cef1..46b066858718 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.h +++ b/ydb/services/persqueue_v1/actors/schema_actors.h @@ -449,16 +449,9 @@ using TBase = TPQInternalSchemaActor PartitionIds; - - bool GotPartitions = false; - bool GotNodesInfo = false; }; } // namespace NKikimr::NGRpcProxy::V1