Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka port assignment through discoveryCache #13197

Merged
merged 11 commits into from
Jan 17, 2025
20 changes: 15 additions & 5 deletions ydb/core/discovery/discovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ namespace NDiscovery {
if (!CheckEndpointId(endpointId, entry)) {
continue;
}

if (entry.GetSsl()) {
AddEndpoint(cachedMessageSsl, statesSsl, entry);
} else {
Expand All @@ -200,7 +199,6 @@ namespace NDiscovery {
cachedMessageSsl.set_self_location(location);
}
}

return {SerializeResult(cachedMessage), SerializeResult(cachedMessageSsl), std::move(infoEntries)};
}
}
Expand Down Expand Up @@ -235,6 +233,7 @@ namespace NDiscoveryPrivate {

THashMap<TString, TVector<TWaiter>> Requested;
bool Scheduled = false;
TMaybe<TString> EndpointId;

auto Request(const TString& database) {
auto result = Requested.emplace(database, TVector<TWaiter>());
Expand Down Expand Up @@ -279,7 +278,8 @@ namespace NDiscoveryPrivate {

currentCachedMessage = std::make_shared<NDiscovery::TCachedMessageData>(
NDiscovery::CreateCachedMessage(
currentCachedMessage->InfoEntries, std::move(msg->Updates), {}, {}, NameserviceResponse)
currentCachedMessage->InfoEntries, std::move(msg->Updates),
{}, EndpointId.GetOrElse({}), NameserviceResponse)
);

auto it = Requested.find(path);
Expand All @@ -293,7 +293,8 @@ namespace NDiscoveryPrivate {
const auto& path = msg->Path;

auto newCachedData = std::make_shared<NDiscovery::TCachedMessageData>(
NDiscovery::CreateCachedMessage({}, std::move(msg->InfoEntries), {}, {}, NameserviceResponse)
NDiscovery::CreateCachedMessage({}, std::move(msg->InfoEntries),
{}, EndpointId.GetOrElse({}), NameserviceResponse)
);
newCachedData->Status = msg->Status;

Expand Down Expand Up @@ -372,6 +373,11 @@ namespace NDiscoveryPrivate {
}

public:
TDiscoveryCache() = default;
TDiscoveryCache(const TString& endpointId)
: EndpointId(endpointId)
{
}
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::DISCOVERY_CACHE_ACTOR;
}
Expand Down Expand Up @@ -523,7 +529,7 @@ class TDiscoverer: public TActorBootstrapped<TDiscoverer> {
return true;
default:
return true;
}
}
}

void MaybeReply() {
Expand Down Expand Up @@ -625,4 +631,8 @@ IActor* CreateDiscoveryCache() {
return new NDiscoveryPrivate::TDiscoveryCache();
}

IActor* CreateDiscoveryCache(const TString& endpointId) {
return new NDiscoveryPrivate::TDiscoveryCache(endpointId);
}

}
1 change: 1 addition & 0 deletions ydb/core/discovery/discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,6 @@ IActor* CreateDiscoverer(

// Used to reduce number of requests to Board
IActor* CreateDiscoveryCache();
IActor* CreateDiscoveryCache(const TString& endpointId);
FloatingCrowbar marked this conversation as resolved.
Show resolved Hide resolved

}
6 changes: 6 additions & 0 deletions ydb/core/driver_lib/run/config_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ void TRunCommandConfigParser::SetupLastGetOptForConfigFiles(NLastGetopt::TOpts&
opts.AddLongOption("grpc-file", "gRPC config file").OptionalArgument("PATH");
opts.AddLongOption("grpc-port", "enable gRPC server on port").RequiredArgument("PORT");
opts.AddLongOption("grpcs-port", "enable gRPC SSL server on port").RequiredArgument("PORT");
opts.AddLongOption("kafka-port", "enable kafka proxy server on port").OptionalArgument("PORT");
opts.AddLongOption("grpc-public-host", "set public gRPC host for discovery").RequiredArgument("HOST");
opts.AddLongOption("grpc-public-port", "set public gRPC port for discovery").RequiredArgument("PORT");
opts.AddLongOption("grpcs-public-port", "set public gRPC SSL port for discovery").RequiredArgument("PORT");
Expand Down Expand Up @@ -165,6 +166,11 @@ void TRunCommandConfigParser::ParseConfigFiles(const NLastGetopt::TOptsParseResu
conf.SetSslPort(FromString<ui16>(res.Get("grpcs-port")));
}

if (res.Has("kafka-port")) {
auto& conf = *Config.AppConfig.MutableKafkaProxyConfig();
conf.SetListeningPort(FromString<ui16>(res.Get("kafka-port")));
}

if (res.Has("grpc-public-host")) {
auto& conf = *Config.AppConfig.MutableGRpcConfig();
conf.SetPublicHost(res.Get("grpc-public-host"));
Expand Down
12 changes: 12 additions & 0 deletions ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1714,6 +1714,18 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se
endpoints.push_back(std::move(desc));
}

if (Config.GetKafkaProxyConfig().GetEnableKafkaProxy()) {
const auto& kakfaConfig = Config.GetKafkaProxyConfig();
TIntrusivePtr<NGRpcService::TGrpcEndpointDescription> desc = new NGRpcService::TGrpcEndpointDescription();
desc->Address = config.GetPublicHost() ? config.GetPublicHost() : address;
desc->Port = kakfaConfig.GetListeningPort();
desc->Ssl = kakfaConfig.HasSslCertificate();

TVector<TString> services = {"datastreams", "pq", "pqv1"};
FloatingCrowbar marked this conversation as resolved.
Show resolved Hide resolved
desc->ServedServices.insert(desc->ServedServices.end(), services.begin(), services.end());
desc->EndpointId = NGRpcService::KafkaEndpointId;
FloatingCrowbar marked this conversation as resolved.
Show resolved Hide resolved
}

for (auto &sx : config.GetExtEndpoints()) {
const TString &localAddress = sx.GetHost() ? (sx.GetHost() != "[::]" ? sx.GetHost() : FQDNHostName()) : address;
if (const ui32 port = sx.GetPort()) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/grpc_services/grpc_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ inline TActorId CreateGrpcPublisherServiceActorId() {
return actorId;
}

const static TString KafkaEndpointId = "KafkaProxy";
}
}
145 changes: 120 additions & 25 deletions ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

#include <ydb/core/kafka_proxy/kafka_events.h>
#include <ydb/services/persqueue_v1/actors/schema_actors.h>
#include <ydb/core/grpc_services/grpc_endpoint.h>
#include <ydb/core/base/statestorage.h>

namespace NKafka {
using namespace NKikimr;
using namespace NKikimr::NGRpcProxy::V1;

NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context,
Expand All @@ -19,6 +22,8 @@ void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) {

if (WithProxy) {
AddProxyNodeToBrokers();
} else {
SendDiscoveryRequest();
}

if (Message->Topics.size() == 0 && !WithProxy) {
Expand All @@ -28,22 +33,98 @@ void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) {
if (Message->Topics.size() != 0) {
ProcessTopics();
}

Become(&TKafkaMetadataActor::StateWork);
RespondIfRequired(ctx);
}

void TKafkaMetadataActor::AddCurrentNodeToBrokers() {
NeedCurrentNode = true;
if (!DiscoveryRequested) {
RequestICNodeCache();
}
}

void TKafkaMetadataActor::SendDiscoveryRequest() {
DiscoveryRequested = true;
if (!DiscoveryCacheActor) {
OwnDiscoveryCache = true;
DiscoveryCacheActor = Register(CreateDiscoveryCache(NGRpcService::KafkaEndpointId));
}
PendingResponses++;
Send(NKikimr::NIcNodeCache::CreateICNodesInfoCacheServiceId(), new NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoRequest());
Register(CreateDiscoverer(&MakeEndpointsBoardPath, Context->DatabasePath, SelfId(), DiscoveryCacheActor));
}


void TKafkaMetadataActor::HandleDiscoveryError(TEvDiscovery::TEvError::TPtr& ev) {
PendingResponses--;
if (NeedCurrentNode) {
RequestICNodeCache();
FloatingCrowbar marked this conversation as resolved.
Show resolved Hide resolved
}
DiscoveryRequested = false;
for (auto& [index, ev] : PendingTopicResponses) {
auto& topic = Response->Topics[index];
AddTopicResponse(topic, ev->Get());
}
PendingTopicResponses.clear();
RespondIfRequired(ActorContext());
}

void TKafkaMetadataActor::HandleDiscoveryData(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const NActors::TActorContext& ctx) {
PendingResponses--;
ProcessDiscoveryData(ev);
FloatingCrowbar marked this conversation as resolved.
Show resolved Hide resolved
if (NeedCurrentNode) {
auto currNodeIter = Nodes.find(SelfId().NodeId());
if (currNodeIter == Nodes.end()) {
RequestICNodeCache();
} else {
Y_ABORT_UNLESS(!CurrentNodeHostname.empty());
AddBroker(ctx.SelfID.NodeId(), CurrentNodeHostname, currNodeIter->second);
}
}
DiscoveryRequested = false;
for (auto& [index, ev] : PendingTopicResponses) {
auto& topic = Response->Topics[index];
AddTopicResponse(topic, ev->Get());
}
PendingTopicResponses.clear();
RespondIfRequired(ActorContext());
}

bool TKafkaMetadataActor::ProcessDiscoveryData(TEvDiscovery::TEvDiscoveryData::TPtr& ev) {
bool expectSsl = Context->Config.HasSslCertificate();

Ydb::Discovery::ListEndpointsResponse leResponse;
Ydb::Discovery::ListEndpointsResult leResult;
TString const* cachedMessage;
if (expectSsl) {
cachedMessage = &ev->Get()->CachedMessageData->CachedMessageSsl;
} else {
cachedMessage = &ev->Get()->CachedMessageData->CachedMessage;
}
auto ok = leResponse.ParseFromString(*cachedMessage);
if (ok) {
ok = leResponse.operation().result().UnpackTo(&leResult);
}
if (!ok)
return false;

for (auto& endpoint : leResult.endpoints()) {
Nodes.insert({endpoint.node_id(), endpoint.port()});
if (NeedCurrentNode && endpoint.node_id() == SelfId().NodeId()) {
CurrentNodeHostname = endpoint.address();
}
}
return true;
}

void TKafkaMetadataActor::RequestICNodeCache() {
PendingResponses++;
Send(NKikimr::NIcNodeCache::CreateICNodesInfoCacheServiceId(), new NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoRequest());
}

void TKafkaMetadataActor::AddProxyNodeToBrokers() {
auto broker = TMetadataResponseData::TMetadataResponseBroker{};
broker.NodeId = ProxyNodeId;
broker.Host = Context->Config.GetProxy().GetHostname();
broker.Port = Context->Config.GetProxy().GetPort();
Response->Brokers.emplace_back(std::move(broker));
AddBroker(ProxyNodeId, Context->Config.GetProxy().GetHostname(), Context->Config.GetProxy().GetPort());
}

void TKafkaMetadataActor::ProcessTopics() {
Expand Down Expand Up @@ -75,29 +156,32 @@ void TKafkaMetadataActor::HandleNodesResponse(NKikimr::NIcNodeCache::TEvICNodesI
Y_ABORT_UNLESS(!iter.IsEnd());
auto host = (*ev->Get()->Nodes)[iter->second].Host;
KAFKA_LOG_D("Incoming TEvGetAllNodesInfoResponse. Host#: " << host);
AddBroker(ctx.SelfID.NodeId(), host, Context->Config.GetListeningPort());

--PendingResponses;
RespondIfRequired(ctx);
}

void TKafkaMetadataActor::AddBroker(ui64 nodeId, const TString& host, ui64 port) {
auto broker = TMetadataResponseData::TMetadataResponseBroker{};
broker.NodeId = ctx.SelfID.NodeId();
broker.NodeId = nodeId;
broker.Host = host;
broker.Port = Context->Config.GetListeningPort();
broker.Port = port;
Response->Brokers.emplace_back(std::move(broker));

--PendingResponses;
RespondIfRequired(ctx);
}

TActorId TKafkaMetadataActor::SendTopicRequest(const TMetadataRequestData::TMetadataRequestTopic& topicRequest) {
KAFKA_LOG_D("Describe partitions locations for topic '" << *topicRequest.Name << "' for user " << GetUsernameOrAnonymous(Context));
KAFKA_LOG_D("Describe partitions locations for topic '" << *topicRequest.Name << "' for user '" << Context->UserToken->GetUserSID() << "'");

TGetPartitionsLocationRequest locationRequest{};
locationRequest.Topic = NormalizePath(Context->DatabasePath, topicRequest.Name.value());
locationRequest.Token = GetUserSerializedToken(Context);
locationRequest.Token = Context->UserToken->GetSerializedToken();
locationRequest.Database = Context->DatabasePath;

PendingResponses++;

return Register(new TPartitionsLocationActor(locationRequest, SelfId()));
}
}

void TKafkaMetadataActor::AddTopicError(
TMetadataResponseData::TMetadataResponseTopic& topic, EKafkaErrors errorCode
Expand Down Expand Up @@ -130,12 +214,12 @@ void TKafkaMetadataActor::AddTopicResponse(TMetadataResponseData::TMetadataRespo
if (hostname.StartsWith(UnderlayPrefix)) {
hostname = hostname.substr(sizeof(UnderlayPrefix) - 1);
}
ui64 port = Context->Config.GetListeningPort();
auto nodeIter = Nodes.find(part.NodeId);
if (!nodeIter.IsEnd())
port = nodeIter->second;

auto broker = TMetadataResponseData::TMetadataResponseBroker{};
broker.NodeId = part.NodeId;
broker.Host = hostname;
broker.Port = Context->Config.GetListeningPort();
Response->Brokers.emplace_back(std::move(broker));
AddBroker(part.NodeId, hostname, port);
}
}
}
Expand All @@ -147,7 +231,7 @@ void TKafkaMetadataActor::HandleResponse(TEvLocationResponse::TPtr ev, const TAc
auto* r = ev->Get();
auto actorIter = TopicIndexes.find(ev->Sender);

Y_DEBUG_ABORT_UNLESS(!actorIter.IsEnd());
Y_DEBUG_ABORT_UNLESS(!actorIter.IsEnd());
Y_DEBUG_ABORT_UNLESS(!actorIter->second.empty());

if (actorIter.IsEnd()) {
Expand All @@ -160,28 +244,39 @@ void TKafkaMetadataActor::HandleResponse(TEvLocationResponse::TPtr ev, const TAc

return RespondIfRequired(ctx);
}

for (auto index : actorIter->second) {
auto& topic = Response->Topics[index];
if (r->Status == Ydb::StatusIds::SUCCESS) {
KAFKA_LOG_D("Describe topic '" << topic.Name << "' location finishied successful");
AddTopicResponse(topic, r);
if (DiscoveryRequested) {
PendingTopicResponses.insert(std::make_pair(index, ev));
} else {
AddTopicResponse(topic, r);
}
} else {
KAFKA_LOG_ERROR("Describe topic '" << topic.Name << "' location finishied with error: Code=" << r->Status << ", Issues=" << r->Issues.ToOneLineString());
AddTopicError(topic, ConvertErrorCode(r->Status));
FloatingCrowbar marked this conversation as resolved.
Show resolved Hide resolved
}
}

RespondIfRequired(ActorContext());
}

void TKafkaMetadataActor::RespondIfRequired(const TActorContext& ctx) {
if (PendingResponses == 0) {
if (PendingResponses == 0 && !PendingTopicResponses) {
Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, Response, ErrorCode));
Die(ctx);
}
}

void TKafkaMetadataActor::Die(const TActorContext& ctx) {
if (OwnDiscoveryCache) {
FloatingCrowbar marked this conversation as resolved.
Show resolved Hide resolved
Send(DiscoveryCacheActor, new TEvents::TEvPoison());
OwnDiscoveryCache = false;
}
TActor::Die(ctx);
}

TString TKafkaMetadataActor::LogPrefix() const {
return TStringBuilder() << "TKafkaMetadataActor " << SelfId() << " ";
}
Expand Down
Loading
Loading