Skip to content

Commit

Permalink
Avoid request from IC in location actor
Browse files Browse the repository at this point in the history
  • Loading branch information
FloatingCrowbar committed Jan 10, 2025
1 parent 8edbfd6 commit 4863285
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 123 deletions.
186 changes: 104 additions & 82 deletions ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) {
if (WithProxy) {
AddProxyNodeToBrokers();
} else {
SendDiscoveryRequest();
}
if (Context->Config.GetEnableEndpointDiscovery())
SendDiscoveryRequest();
else
RequestICNodeCache();

if (Message->Topics.size() == 0 && !WithProxy) {
AddCurrentNodeToBrokers();
if (Message->Topics.size() == 0) {
NeedCurrentNode = true;
}
}

if (Message->Topics.size() != 0) {
Expand All @@ -38,17 +41,7 @@ void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) {
RespondIfRequired(ctx);
}

void TKafkaMetadataActor::AddCurrentNodeToBrokers() {
NeedCurrentNode = true;
if (!DiscoveryRequested) {
RequestICNodeCache();
}
}

void TKafkaMetadataActor::SendDiscoveryRequest() {
if (!Context->Config.GetEnableEndpointDiscovery())
return;
DiscoveryRequested = true;
if (!DiscoveryCacheActor) {
OwnDiscoveryCache = true;
DiscoveryCacheActor = Register(CreateDiscoveryCache(NGRpcService::KafkaEndpointId));
Expand All @@ -60,40 +53,17 @@ void TKafkaMetadataActor::SendDiscoveryRequest() {

void TKafkaMetadataActor::HandleDiscoveryError(TEvDiscovery::TEvError::TPtr& ev) {
PendingResponses--;
if (NeedCurrentNode) {
RequestICNodeCache();
}
DiscoveryRequested = false;
for (auto& [index, ev] : PendingTopicResponses) {
auto& topic = Response->Topics[index];
AddTopicResponse(topic, ev->Get());
}
PendingTopicResponses.clear();
HaveError = true;
RespondIfRequired(ActorContext());
}

void TKafkaMetadataActor::HandleDiscoveryData(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const NActors::TActorContext& ctx) {
PendingResponses--;
ProcessDiscoveryData(ev);
if (NeedCurrentNode) {
auto currNodeIter = Nodes.find(SelfId().NodeId());
if (currNodeIter == Nodes.end()) {
RequestICNodeCache();
} else {
Y_ABORT_UNLESS(!CurrentNodeHostname.empty());
AddBroker(ctx.SelfID.NodeId(), CurrentNodeHostname, currNodeIter->second);
}
}
DiscoveryRequested = false;
for (auto& [index, ev] : PendingTopicResponses) {
auto& topic = Response->Topics[index];
AddTopicResponse(topic, ev->Get());
}
PendingTopicResponses.clear();
RespondIfRequired(ActorContext());
}

bool TKafkaMetadataActor::ProcessDiscoveryData(TEvDiscovery::TEvDiscoveryData::TPtr& ev) {
void TKafkaMetadataActor::ProcessDiscoveryData(TEvDiscovery::TEvDiscoveryData::TPtr& ev) {
bool expectSsl = Context->Config.HasSslCertificate();

Ydb::Discovery::ListEndpointsResponse leResponse;
Expand All @@ -108,23 +78,35 @@ bool TKafkaMetadataActor::ProcessDiscoveryData(TEvDiscovery::TEvDiscoveryData::T
if (ok) {
ok = leResponse.operation().result().UnpackTo(&leResult);
}
if (!ok)
return false;
if (!ok) {
HaveError = true;
return;
}

for (auto& endpoint : leResult.endpoints()) {
Nodes.insert({endpoint.node_id(), endpoint.port()});
if (NeedCurrentNode && endpoint.node_id() == SelfId().NodeId()) {
CurrentNodeHostname = endpoint.address();
}
Nodes.insert({endpoint.node_id(), {endpoint.address(), endpoint.port()}});
}
return true;
}

void TKafkaMetadataActor::RequestICNodeCache() {
Y_ABORT_UNLESS(!FallbackToIcDiscovery);
FallbackToIcDiscovery = true;
PendingResponses++;
Send(NKikimr::NIcNodeCache::CreateICNodesInfoCacheServiceId(), new NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoRequest());
}

void TKafkaMetadataActor::HandleNodesResponse(
NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev,
const NActors::TActorContext& ctx
) {
Y_ABORT_UNLESS(FallbackToIcDiscovery);
for (const auto& [nodeId, index] : *ev->Get()->NodeIdsMapping) {
Nodes[nodeId] = {(*ev->Get()->Nodes)[index].Host, (ui32)Context->Config.GetListeningPort()};
}
--PendingResponses;
RespondIfRequired(ctx);
}

void TKafkaMetadataActor::AddProxyNodeToBrokers() {
AddBroker(ProxyNodeId, Context->Config.GetProxy().GetHostname(), Context->Config.GetProxy().GetPort());
}
Expand Down Expand Up @@ -153,25 +135,6 @@ void TKafkaMetadataActor::ProcessTopics() {
}
}

void TKafkaMetadataActor::HandleNodesResponse(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev, const NActors::TActorContext& ctx) {
auto iter = ev->Get()->NodeIdsMapping->find(ctx.SelfID.NodeId());
Y_ABORT_UNLESS(!iter.IsEnd());
auto host = (*ev->Get()->Nodes)[iter->second].Host;
KAFKA_LOG_D("Incoming TEvGetAllNodesInfoResponse. Host#: " << host);
AddBroker(ctx.SelfID.NodeId(), host, Context->Config.GetListeningPort());

--PendingResponses;
RespondIfRequired(ctx);
}

void TKafkaMetadataActor::AddBroker(ui64 nodeId, const TString& host, ui64 port) {
auto broker = TMetadataResponseData::TMetadataResponseBroker{};
broker.NodeId = nodeId;
broker.Host = host;
broker.Port = port;
Response->Brokers.emplace_back(std::move(broker));
}

TActorId TKafkaMetadataActor::SendTopicRequest(const TMetadataRequestData::TMetadataRequestTopic& topicRequest) {
KAFKA_LOG_D("Describe partitions locations for topic '" << *topicRequest.Name << "' for user '" << Context->UserToken->GetUserSID() << "'");

Expand All @@ -185,17 +148,34 @@ TActorId TKafkaMetadataActor::SendTopicRequest(const TMetadataRequestData::TMeta
return Register(new TPartitionsLocationActor(locationRequest, SelfId()));
}

TVector<TKafkaMetadataActor::TNodeInfo*> TKafkaMetadataActor::CheckTopicNodes(TEvLocationResponse* response) {
TVector<TNodeInfo*> partitionNodes;
for (const auto& part : response->Partitions) {
auto iter = Nodes.find(part.NodeId);
if (iter.IsEnd()) {
return {};
}
partitionNodes.push_back(&iter->second);
}
return partitionNodes;
}

void TKafkaMetadataActor::AddTopicError(
TMetadataResponseData::TMetadataResponseTopic& topic, EKafkaErrors errorCode
) {
topic.ErrorCode = errorCode;
ErrorCode = errorCode;
}

void TKafkaMetadataActor::AddTopicResponse(TMetadataResponseData::TMetadataResponseTopic& topic, TEvLocationResponse* response) {
void TKafkaMetadataActor::AddTopicResponse(
TMetadataResponseData::TMetadataResponseTopic& topic,
TEvLocationResponse* response,
const TVector<TKafkaMetadataActor::TNodeInfo*>& partitionNodes
) {
topic.ErrorCode = NONE_ERROR;

topic.Partitions.reserve(response->Partitions.size());
auto nodeIter = partitionNodes.begin();
for (const auto& part : response->Partitions) {
auto nodeId = WithProxy ? ProxyNodeId : part.NodeId;

Expand All @@ -212,22 +192,18 @@ void TKafkaMetadataActor::AddTopicResponse(TMetadataResponseData::TMetadataRespo
if (!WithProxy) {
auto ins = AllClusterNodes.insert(part.NodeId);
if (ins.second) {
auto hostname = part.Hostname;
auto hostname = (*nodeIter)->Host;
if (hostname.StartsWith(UnderlayPrefix)) {
hostname = hostname.substr(sizeof(UnderlayPrefix) - 1);
}
ui64 port = Context->Config.GetListeningPort();
auto nodeIter = Nodes.find(part.NodeId);
if (!nodeIter.IsEnd())
port = nodeIter->second;

AddBroker(part.NodeId, hostname, port);
AddBroker(part.NodeId, hostname, (*nodeIter)->Port);
}
}
++nodeIter;
}
}

void TKafkaMetadataActor::HandleResponse(TEvLocationResponse::TPtr ev, const TActorContext& ctx) {
void TKafkaMetadataActor::HandleLocationResponse(TEvLocationResponse::TPtr ev, const TActorContext& ctx) {
--PendingResponses;

auto* r = ev->Get();
Expand All @@ -243,32 +219,78 @@ void TKafkaMetadataActor::HandleResponse(TEvLocationResponse::TPtr ev, const TAc

if (actorIter->second.empty()) {
KAFKA_LOG_CRIT("Corrupted state (empty actorId in mapping). Ignored location response, expect incomplete reply");

return RespondIfRequired(ctx);
}

for (auto index : actorIter->second) {
auto& topic = Response->Topics[index];
if (r->Status == Ydb::StatusIds::SUCCESS) {
KAFKA_LOG_D("Describe topic '" << topic.Name << "' location finishied successful");
if (DiscoveryRequested) {
PendingTopicResponses.insert(std::make_pair(index, ev));
} else {
AddTopicResponse(topic, r);
}
PendingTopicResponses.insert(std::make_pair(index, ev->Release()));
} else {
KAFKA_LOG_ERROR("Describe topic '" << topic.Name << "' location finishied with error: Code=" << r->Status << ", Issues=" << r->Issues.ToOneLineString());
AddTopicError(topic, ConvertErrorCode(r->Status));
}
}
RespondIfRequired(ActorContext());
RespondIfRequired(ctx);
}

void TKafkaMetadataActor::AddBroker(ui64 nodeId, const TString& host, ui64 port) {
auto broker = TMetadataResponseData::TMetadataResponseBroker{};
broker.NodeId = nodeId;
broker.Host = host;
broker.Port = port;
Response->Brokers.emplace_back(std::move(broker));
}

void TKafkaMetadataActor::RespondIfRequired(const TActorContext& ctx) {
if (PendingResponses == 0 && !PendingTopicResponses) {
auto Respond = [&] {
Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, Response, ErrorCode));
Die(ctx);
};

if (HaveError) {
ErrorCode = EKafkaErrors::UNKNOWN_SERVER_ERROR;
for (auto& topic : Response->Topics) {
AddTopicError(topic, ErrorCode);
}
Respond();
return;
}
if (PendingResponses != 0) {
return;
}

if (NeedCurrentNode) {
auto nodeIter = Nodes.find(SelfId().NodeId());
if (nodeIter.IsEnd()) {
// Node info was not found, request from IC nodes cache instead
Y_ABORT_UNLESS(!FallbackToIcDiscovery);
RequestICNodeCache();
return;
}
AddBroker(nodeIter->first, nodeIter->second.Host, nodeIter->second.Port);
}
while (!PendingTopicResponses.empty()) {
auto& [index, ev] = *PendingTopicResponses.begin();
auto& topic = Response->Topics[index];
auto topicNodes = CheckTopicNodes(ev.Get());
if (topicNodes.empty()) {
if (!FallbackToIcDiscovery) {
// Node info wasn't found via discovery, fallback to interconnect
RequestICNodeCache();
return;
} else {
// Already tried both YDB discovery and interconnect, still couldn't find the node for partition. Throw error
AddTopicError(topic, EKafkaErrors::UNKNOWN_SERVER_ERROR);
}
} else {
AddTopicResponse(topic, ev.Get(), topicNodes);
}
PendingTopicResponses.erase(PendingTopicResponses.begin());
}

Respond();
}

void TKafkaMetadataActor::Die(const TActorContext& ctx) {
Expand Down
31 changes: 20 additions & 11 deletions ydb/core/kafka_proxy/actors/kafka_metadata_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,33 @@ class TKafkaMetadataActor: public NActors::TActorBootstrapped<TKafkaMetadataActo
private:
using TEvLocationResponse = NKikimr::NGRpcProxy::V1::TEvPQProxy::TEvPartitionLocationResponse;

struct TNodeInfo {
TString Host;
ui32 Port;
};

TActorId SendTopicRequest(const TMetadataRequestData::TMetadataRequestTopic& topicRequest);
void HandleResponse(TEvLocationResponse::TPtr ev, const NActors::TActorContext& ctx);
void HandleNodesResponse(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev, const NActors::TActorContext& ctx);
void HandleLocationResponse(TEvLocationResponse::TPtr ev, const NActors::TActorContext& ctx);
void HandleNodesResponse(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev,
const NActors::TActorContext& ctx);
void HandleDiscoveryData(NKikimr::TEvDiscovery::TEvDiscoveryData::TPtr& ev, const NActors::TActorContext& ctx);
void HandleDiscoveryError(NKikimr::TEvDiscovery::TEvError::TPtr& ev);

void AddTopicResponse(TMetadataResponseData::TMetadataResponseTopic& topic, TEvLocationResponse* response);
void AddTopicResponse(TMetadataResponseData::TMetadataResponseTopic& topic, TEvLocationResponse* response,
const TVector<TNodeInfo*>& nodes);
void AddTopicError(TMetadataResponseData::TMetadataResponseTopic& topic, EKafkaErrors errorCode);
void RespondIfRequired(const NActors::TActorContext& ctx);
void AddProxyNodeToBrokers();
void AddCurrentNodeToBrokers();
void AddBroker(ui64 nodeId, const TString& host, ui64 port);
void RequestICNodeCache();
void ProcessTopics();
void SendDiscoveryRequest();
bool ProcessDiscoveryData(NKikimr::TEvDiscovery::TEvDiscoveryData::TPtr& ev);
void ProcessDiscoveryData(NKikimr::TEvDiscovery::TEvDiscoveryData::TPtr& ev);
TVector<TNodeInfo*> CheckTopicNodes(TEvLocationResponse* response);

STATEFN(StateWork) {
switch (ev->GetTypeRewrite()) {
HFunc(TEvLocationResponse, HandleResponse);
HFunc(TEvLocationResponse, HandleLocationResponse);
HFunc(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse, HandleNodesResponse);
HFunc(NKikimr::TEvDiscovery::TEvDiscoveryData, HandleDiscoveryData);
hFunc(NKikimr::TEvDiscovery::TEvError, HandleDiscoveryError);
Expand All @@ -68,12 +76,13 @@ class TKafkaMetadataActor: public NActors::TActorBootstrapped<TKafkaMetadataActo
EKafkaErrors ErrorCode = EKafkaErrors::NONE_ERROR;

TActorId DiscoveryCacheActor;
bool NeedCurrentNode = false;
TString CurrentNodeHostname;
bool DiscoveryRequested = false;
bool OwnDiscoveryCache = false;
THashMap<ui64, ui64> Nodes;
TMap<ui64, TEvLocationResponse::TPtr> PendingTopicResponses;
bool NeedCurrentNode = false;
bool HaveError = false;
bool FallbackToIcDiscovery = false;
TMap<ui64, TAutoPtr<TEvLocationResponse>> PendingTopicResponses;

THashMap<ui64, TNodeInfo> Nodes;
};

} // namespace NKafka
Loading

0 comments on commit 4863285

Please sign in to comment.