Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka port assignment through discoveryCache #13197

Merged
merged 11 commits into from
Jan 17, 2025
20 changes: 13 additions & 7 deletions ydb/core/discovery/discovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ namespace NDiscovery {
if (!CheckEndpointId(endpointId, entry)) {
continue;
}

if (entry.GetSsl()) {
AddEndpoint(cachedMessageSsl, statesSsl, entry);
} else {
Expand All @@ -200,7 +199,6 @@ namespace NDiscovery {
cachedMessageSsl.set_self_location(location);
}
}

return {SerializeResult(cachedMessage), SerializeResult(cachedMessageSsl), std::move(infoEntries)};
}
}
Expand Down Expand Up @@ -235,6 +233,7 @@ namespace NDiscoveryPrivate {

THashMap<TString, TVector<TWaiter>> Requested;
bool Scheduled = false;
TMaybe<TString> EndpointId;

auto Request(const TString& database) {
auto result = Requested.emplace(database, TVector<TWaiter>());
Expand Down Expand Up @@ -279,7 +278,8 @@ namespace NDiscoveryPrivate {

currentCachedMessage = std::make_shared<NDiscovery::TCachedMessageData>(
NDiscovery::CreateCachedMessage(
currentCachedMessage->InfoEntries, std::move(msg->Updates), {}, {}, NameserviceResponse)
currentCachedMessage->InfoEntries, std::move(msg->Updates),
{}, EndpointId.GetOrElse({}), NameserviceResponse)
);

auto it = Requested.find(path);
Expand All @@ -293,7 +293,8 @@ namespace NDiscoveryPrivate {
const auto& path = msg->Path;

auto newCachedData = std::make_shared<NDiscovery::TCachedMessageData>(
NDiscovery::CreateCachedMessage({}, std::move(msg->InfoEntries), {}, {}, NameserviceResponse)
NDiscovery::CreateCachedMessage({}, std::move(msg->InfoEntries),
{}, EndpointId.GetOrElse({}), NameserviceResponse)
);
newCachedData->Status = msg->Status;

Expand Down Expand Up @@ -372,6 +373,11 @@ namespace NDiscoveryPrivate {
}

public:
TDiscoveryCache() = default;
TDiscoveryCache(const TString& endpointId)
: EndpointId(endpointId)
{
}
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::DISCOVERY_CACHE_ACTOR;
}
Expand Down Expand Up @@ -523,7 +529,7 @@ class TDiscoverer: public TActorBootstrapped<TDiscoverer> {
return true;
default:
return true;
}
}
}

void MaybeReply() {
Expand Down Expand Up @@ -621,8 +627,8 @@ IActor* CreateDiscoverer(
return new TDiscoverer(f, database, replyTo, cacheId);
}

IActor* CreateDiscoveryCache() {
return new NDiscoveryPrivate::TDiscoveryCache();
IActor* CreateDiscoveryCache(const TString& endpointId) {
return new NDiscoveryPrivate::TDiscoveryCache(endpointId);
}

}
2 changes: 1 addition & 1 deletion ydb/core/discovery/discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,6 @@ IActor* CreateDiscoverer(
const TActorId& cacheId);

// Used to reduce number of requests to Board
IActor* CreateDiscoveryCache();
IActor* CreateDiscoveryCache(const TString& endpointId = {});

}
6 changes: 6 additions & 0 deletions ydb/core/driver_lib/run/config_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ void TRunCommandConfigParser::SetupLastGetOptForConfigFiles(NLastGetopt::TOpts&
opts.AddLongOption("grpc-file", "gRPC config file").OptionalArgument("PATH");
opts.AddLongOption("grpc-port", "enable gRPC server on port").RequiredArgument("PORT");
opts.AddLongOption("grpcs-port", "enable gRPC SSL server on port").RequiredArgument("PORT");
opts.AddLongOption("kafka-port", "enable kafka proxy server on port").OptionalArgument("PORT");
opts.AddLongOption("grpc-public-host", "set public gRPC host for discovery").RequiredArgument("HOST");
opts.AddLongOption("grpc-public-port", "set public gRPC port for discovery").RequiredArgument("PORT");
opts.AddLongOption("grpcs-public-port", "set public gRPC SSL port for discovery").RequiredArgument("PORT");
Expand Down Expand Up @@ -165,6 +166,11 @@ void TRunCommandConfigParser::ParseConfigFiles(const NLastGetopt::TOptsParseResu
conf.SetSslPort(FromString<ui16>(res.Get("grpcs-port")));
}

if (res.Has("kafka-port")) {
auto& conf = *Config.AppConfig.MutableKafkaProxyConfig();
conf.SetListeningPort(FromString<ui16>(res.Get("kafka-port")));
}

if (res.Has("grpc-public-host")) {
auto& conf = *Config.AppConfig.MutableGRpcConfig();
conf.SetPublicHost(res.Get("grpc-public-host"));
Expand Down
23 changes: 21 additions & 2 deletions ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include <ydb/core/control/immediate_control_board_actor.h>

#include <ydb/core/driver_lib/version/version.h>
#include <ydb/core/discovery/discovery.h>

#include <ydb/core/grpc_services/grpc_mon.h>
#include <ydb/core/grpc_services/grpc_request_proxy.h>
Expand Down Expand Up @@ -1714,6 +1715,16 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se
endpoints.push_back(std::move(desc));
}

if (Config.GetKafkaProxyConfig().GetEnableKafkaProxy()) {
const auto& kakfaConfig = Config.GetKafkaProxyConfig();
TIntrusivePtr<NGRpcService::TGrpcEndpointDescription> desc = new NGRpcService::TGrpcEndpointDescription();
desc->Address = config.GetPublicHost() ? config.GetPublicHost() : address;
desc->Port = kakfaConfig.GetListeningPort();
desc->Ssl = kakfaConfig.HasSslCertificate();

desc->EndpointId = NGRpcService::KafkaEndpointId;
FloatingCrowbar marked this conversation as resolved.
Show resolved Hide resolved
}

for (auto &sx : config.GetExtEndpoints()) {
const TString &localAddress = sx.GetHost() ? (sx.GetHost() != "[::]" ? sx.GetHost() : FQDNHostName()) : address;
if (const ui32 port = sx.GetPort()) {
Expand Down Expand Up @@ -2762,10 +2773,18 @@ void TKafkaProxyServiceInitializer::InitializeServices(NActors::TActorSystemSetu
settings.CertificateFile = Config.GetKafkaProxyConfig().GetCert();
settings.PrivateKeyFile = Config.GetKafkaProxyConfig().GetKey();

if (Config.GetKafkaProxyConfig().GetEnableEndpointDiscovery()) {
FloatingCrowbar marked this conversation as resolved.
Show resolved Hide resolved
setup->LocalServices.emplace_back(
NKafka::MakeKafkaDiscoveryCacheID(),
TActorSetupCmd(CreateDiscoveryCache(NGRpcService::KafkaEndpointId),
TMailboxType::HTSwap, appData->UserPoolId)
);
}
setup->LocalServices.emplace_back(
TActorId(),
TActorSetupCmd(NKafka::CreateKafkaListener(MakePollerActorId(), settings, Config.GetKafkaProxyConfig()),
TMailboxType::HTSwap, appData->UserPoolId)
TActorSetupCmd(NKafka::CreateKafkaListener(MakePollerActorId(), settings, Config.GetKafkaProxyConfig(),
NKafka::MakeKafkaDiscoveryCacheID()),
TMailboxType::HTSwap, appData->UserPoolId)
);

IActor* metricsActor = CreateKafkaMetricsActor(NKafka::TKafkaMetricsSettings{appData->Counters});
Expand Down
1 change: 1 addition & 0 deletions ydb/core/grpc_services/grpc_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ inline TActorId CreateGrpcPublisherServiceActorId() {
return actorId;
}

const static TString KafkaEndpointId = "KafkaProxy";
}
}
2 changes: 2 additions & 0 deletions ydb/core/kafka_proxy/actors/actors.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ struct TContext {

NKikimr::NPQ::TRlContext RlContext;

TActorId DiscoveryCacheActor;

bool Authenticated() { return AuthenticationStep == SUCCESS; }
};

Expand Down
Loading
Loading