Skip to content

Commit

Permalink
Kafka port assignment through discoveryCache
Browse files Browse the repository at this point in the history
  • Loading branch information
FloatingCrowbar committed Jan 8, 2025
1 parent a5622d4 commit 15b0d19
Show file tree
Hide file tree
Showing 10 changed files with 455 additions and 19 deletions.
20 changes: 15 additions & 5 deletions ydb/core/discovery/discovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ namespace NDiscovery {
if (!CheckEndpointId(endpointId, entry)) {
continue;
}

if (entry.GetSsl()) {
AddEndpoint(cachedMessageSsl, statesSsl, entry);
} else {
Expand All @@ -200,7 +199,6 @@ namespace NDiscovery {
cachedMessageSsl.set_self_location(location);
}
}

return {SerializeResult(cachedMessage), SerializeResult(cachedMessageSsl), std::move(infoEntries)};
}
}
Expand Down Expand Up @@ -235,6 +233,7 @@ namespace NDiscoveryPrivate {

THashMap<TString, TVector<TWaiter>> Requested;
bool Scheduled = false;
TMaybe<TString> EndpointId;

auto Request(const TString& database) {
auto result = Requested.emplace(database, TVector<TWaiter>());
Expand Down Expand Up @@ -279,7 +278,8 @@ namespace NDiscoveryPrivate {

currentCachedMessage = std::make_shared<NDiscovery::TCachedMessageData>(
NDiscovery::CreateCachedMessage(
currentCachedMessage->InfoEntries, std::move(msg->Updates), {}, {}, NameserviceResponse)
currentCachedMessage->InfoEntries, std::move(msg->Updates),
{}, EndpointId.GetOrElse({}), NameserviceResponse)
);

auto it = Requested.find(path);
Expand All @@ -293,7 +293,8 @@ namespace NDiscoveryPrivate {
const auto& path = msg->Path;

auto newCachedData = std::make_shared<NDiscovery::TCachedMessageData>(
NDiscovery::CreateCachedMessage({}, std::move(msg->InfoEntries), {}, {}, NameserviceResponse)
NDiscovery::CreateCachedMessage({}, std::move(msg->InfoEntries),
{}, EndpointId.GetOrElse({}), NameserviceResponse)
);
newCachedData->Status = msg->Status;

Expand Down Expand Up @@ -372,6 +373,11 @@ namespace NDiscoveryPrivate {
}

public:
TDiscoveryCache() = default;
TDiscoveryCache(const TString& endpointId)
: EndpointId(endpointId)
{
}
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::DISCOVERY_CACHE_ACTOR;
}
Expand Down Expand Up @@ -523,7 +529,7 @@ class TDiscoverer: public TActorBootstrapped<TDiscoverer> {
return true;
default:
return true;
}
}
}

void MaybeReply() {
Expand Down Expand Up @@ -625,4 +631,8 @@ IActor* CreateDiscoveryCache() {
return new NDiscoveryPrivate::TDiscoveryCache();
}

IActor* CreateDiscoveryCache(const TString& endpointId) {
return new NDiscoveryPrivate::TDiscoveryCache(endpointId);
}

}
1 change: 1 addition & 0 deletions ydb/core/discovery/discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,6 @@ IActor* CreateDiscoverer(

// Used to reduce number of requests to Board
IActor* CreateDiscoveryCache();
IActor* CreateDiscoveryCache(const TString& endpointId);

}
6 changes: 6 additions & 0 deletions ydb/core/driver_lib/run/config_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ void TRunCommandConfigParser::SetupLastGetOptForConfigFiles(NLastGetopt::TOpts&
opts.AddLongOption("grpc-file", "gRPC config file").OptionalArgument("PATH");
opts.AddLongOption("grpc-port", "enable gRPC server on port").RequiredArgument("PORT");
opts.AddLongOption("grpcs-port", "enable gRPC SSL server on port").RequiredArgument("PORT");
opts.AddLongOption("kafka-port", "enable kafka proxy server on port").OptionalArgument("PORT");
opts.AddLongOption("grpc-public-host", "set public gRPC host for discovery").RequiredArgument("HOST");
opts.AddLongOption("grpc-public-port", "set public gRPC port for discovery").RequiredArgument("PORT");
opts.AddLongOption("grpcs-public-port", "set public gRPC SSL port for discovery").RequiredArgument("PORT");
Expand Down Expand Up @@ -165,6 +166,11 @@ void TRunCommandConfigParser::ParseConfigFiles(const NLastGetopt::TOptsParseResu
conf.SetSslPort(FromString<ui16>(res.Get("grpcs-port")));
}

if (res.Has("kafka-port")) {
auto& conf = *Config.AppConfig.MutableKafkaProxyConfig();
conf.SetListeningPort(FromString<ui16>(res.Get("kafka-port")));
}

if (res.Has("grpc-public-host")) {
auto& conf = *Config.AppConfig.MutableGRpcConfig();
conf.SetPublicHost(res.Get("grpc-public-host"));
Expand Down
12 changes: 12 additions & 0 deletions ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1714,6 +1714,18 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se
endpoints.push_back(std::move(desc));
}

if (Config.GetKafkaProxyConfig().GetEnableKafkaProxy()) {
const auto& kakfaConfig = Config.GetKafkaProxyConfig();
TIntrusivePtr<NGRpcService::TGrpcEndpointDescription> desc = new NGRpcService::TGrpcEndpointDescription();
desc->Address = config.GetPublicHost() ? config.GetPublicHost() : address;
desc->Port = kakfaConfig.GetListeningPort();
desc->Ssl = kakfaConfig.HasSslCertificate();

TVector<TString> services = {"datastreams", "pq", "pqv1"};
desc->ServedServices.insert(desc->ServedServices.end(), services.begin(), services.end());
desc->EndpointId = NGRpcService::KafkaEndpointId;
}

for (auto &sx : config.GetExtEndpoints()) {
const TString &localAddress = sx.GetHost() ? (sx.GetHost() != "[::]" ? sx.GetHost() : FQDNHostName()) : address;
if (const ui32 port = sx.GetPort()) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/grpc_services/grpc_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ inline TActorId CreateGrpcPublisherServiceActorId() {
return actorId;
}

const static TString KafkaEndpointId = "KafkaProxy";
}
}
106 changes: 96 additions & 10 deletions ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

#include <ydb/core/kafka_proxy/kafka_events.h>
#include <ydb/services/persqueue_v1/actors/schema_actors.h>
#include <ydb/core/grpc_services/grpc_endpoint.h>
#include <ydb/core/base/statestorage.h>

namespace NKafka {
using namespace NKikimr;
using namespace NKikimr::NGRpcProxy::V1;

NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context,
Expand All @@ -17,9 +20,10 @@ void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) {
Response->ClusterId = "ydb-cluster";
Response->ControllerId = 1;


if (WithProxy) {
AddProxyNodeToBrokers();
} else {
SendDiscoveryRequest();
}

if (Message->Topics.size() == 0 && !WithProxy) {
Expand All @@ -29,14 +33,89 @@ void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) {
if (Message->Topics.size() != 0) {
ProcessTopics();
}

Become(&TKafkaMetadataActor::StateWork);
RespondIfRequired(ctx);
}

void TKafkaMetadataActor::AddCurrentNodeToBrokers() {
NeedCurrentNode = true;
if (!DiscoveryRequested) {
RequestICNodeCache();
}
}

void TKafkaMetadataActor::SendDiscoveryRequest() {
DiscoveryRequested = true;
if (!DiscoveryCacheActor) {
DiscoveryCacheActor = Register(CreateDiscoveryCache(NGRpcService::KafkaEndpointId));
}
PendingResponses++;
Register(CreateDiscoverer(&MakeEndpointsBoardPath, Context->DatabasePath, SelfId(), DiscoveryCacheActor));
}


void TKafkaMetadataActor::HandleDiscoveryError(TEvDiscovery::TEvError::TPtr& ev) {
PendingResponses--;
if (NeedCurrentNode) {
RequestICNodeCache();
}
DiscoveryRequested = false;
for (auto& [index, ev] : PendingTopicResponses) {
auto& topic = Response->Topics[index];
AddTopicResponse(topic, ev->Get());
}
PendingTopicResponses.clear();
RespondIfRequired(ActorContext());
}

void TKafkaMetadataActor::HandleDiscoveryData(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const NActors::TActorContext& ctx) {
PendingResponses--;
auto res = ProcessDiscoveryData(ev);
if (res) {
RespondIfRequired(ctx);
} else if (NeedCurrentNode) {
RequestICNodeCache();
}
DiscoveryRequested = false;
for (auto& [index, ev] : PendingTopicResponses) {
auto& topic = Response->Topics[index];
AddTopicResponse(topic, ev->Get());
}
PendingTopicResponses.clear();
RespondIfRequired(ActorContext());
}

bool TKafkaMetadataActor::ProcessDiscoveryData(TEvDiscovery::TEvDiscoveryData::TPtr& ev) {
bool expectSsl = Context->Config.HasSslCertificate();

Ydb::Discovery::ListEndpointsResponse leResponse;
Ydb::Discovery::ListEndpointsResult leResult;
TString const* cachedMessage;
if (expectSsl) {
cachedMessage = &ev->Get()->CachedMessageData->CachedMessageSsl;
} else {
cachedMessage = &ev->Get()->CachedMessageData->CachedMessage;
}
auto ok = leResponse.ParseFromString(*cachedMessage);
if (ok) {
ok = leResponse.operation().result().UnpackTo(&leResult);
}
if (!ok)
return false;

for (auto& endpoint : leResult.endpoints()) {
Nodes.insert({endpoint.node_id(), endpoint.port()});
}
if (NeedCurrentNode) {
return Nodes.count(SelfId().NodeId());
}
return true;
}

void TKafkaMetadataActor::RequestICNodeCache() {
PendingResponses++;
Send(NKikimr::NIcNodeCache::CreateICNodesInfoCacheServiceId(), new NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoRequest());
Send(NKikimr::NIcNodeCache::CreateICNodesInfoCacheServiceId(), new NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoRequest());
}

void TKafkaMetadataActor::AddProxyNodeToBrokers() {
Expand Down Expand Up @@ -98,7 +177,7 @@ TActorId TKafkaMetadataActor::SendTopicRequest(const TMetadataRequestData::TMeta
PendingResponses++;

return Register(new TPartitionsLocationActor(locationRequest, SelfId()));
}
}

void TKafkaMetadataActor::AddTopicError(
TMetadataResponseData::TMetadataResponseTopic& topic, EKafkaErrors errorCode
Expand Down Expand Up @@ -135,7 +214,11 @@ void TKafkaMetadataActor::AddTopicResponse(TMetadataResponseData::TMetadataRespo
auto broker = TMetadataResponseData::TMetadataResponseBroker{};
broker.NodeId = part.NodeId;
broker.Host = hostname;
broker.Port = Context->Config.GetListeningPort();
auto nodeIter = Nodes.find(part.NodeId);
if (nodeIter.IsEnd())
broker.Port = Context->Config.GetListeningPort();
else
broker.Port = nodeIter->second;
Response->Brokers.emplace_back(std::move(broker));
}
}
Expand All @@ -148,7 +231,7 @@ void TKafkaMetadataActor::HandleResponse(TEvLocationResponse::TPtr ev, const TAc
auto* r = ev->Get();
auto actorIter = TopicIndexes.find(ev->Sender);

Y_DEBUG_ABORT_UNLESS(!actorIter.IsEnd());
Y_DEBUG_ABORT_UNLESS(!actorIter.IsEnd());
Y_DEBUG_ABORT_UNLESS(!actorIter->second.empty());

if (actorIter.IsEnd()) {
Expand All @@ -161,23 +244,26 @@ void TKafkaMetadataActor::HandleResponse(TEvLocationResponse::TPtr ev, const TAc

return RespondIfRequired(ctx);
}

for (auto index : actorIter->second) {
auto& topic = Response->Topics[index];
if (r->Status == Ydb::StatusIds::SUCCESS) {
KAFKA_LOG_D("Describe topic '" << topic.Name << "' location finishied successful");
AddTopicResponse(topic, r);
if (DiscoveryRequested) {
PendingTopicResponses.insert(std::make_pair(index, ev));
} else {
AddTopicResponse(topic, r);
}
} else {
KAFKA_LOG_ERROR("Describe topic '" << topic.Name << "' location finishied with error: Code=" << r->Status << ", Issues=" << r->Issues.ToOneLineString());
AddTopicError(topic, ConvertErrorCode(r->Status));
}
}

RespondIfRequired(ActorContext());
}

void TKafkaMetadataActor::RespondIfRequired(const TActorContext& ctx) {
if (PendingResponses == 0) {
if (PendingResponses == 0 && !PendingTopicResponses) {
Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, Response, ErrorCode));
Die(ctx);
}
Expand Down
19 changes: 17 additions & 2 deletions ydb/core/kafka_proxy/actors/kafka_metadata_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,20 @@
#include <ydb/library/aclib/aclib.h>
#include <ydb/services/persqueue_v1/actors/events.h>
#include <ydb/services/persqueue_v1/actors/schema_actors.h>
#include <ydb/core/discovery/discovery.h>

namespace NKafka {

class TKafkaMetadataActor: public NActors::TActorBootstrapped<TKafkaMetadataActor> {
public:
TKafkaMetadataActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TMetadataRequestData>& message)
TKafkaMetadataActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TMetadataRequestData>& message,
const TActorId& discoveryCacheActor = {})
: Context(context)
, CorrelationId(correlationId)
, Message(message)
, WithProxy(context->Config.HasProxy() && !context->Config.GetProxy().GetHostname().empty())
, Response(new TMetadataResponseData())
, DiscoveryCacheActor(discoveryCacheActor)
{}

void Bootstrap(const NActors::TActorContext& ctx);
Expand All @@ -26,18 +29,24 @@ class TKafkaMetadataActor: public NActors::TActorBootstrapped<TKafkaMetadataActo
TActorId SendTopicRequest(const TMetadataRequestData::TMetadataRequestTopic& topicRequest);
void HandleResponse(TEvLocationResponse::TPtr ev, const NActors::TActorContext& ctx);
void HandleNodesResponse(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev, const NActors::TActorContext& ctx);
void HandleDiscoveryData(NKikimr::TEvDiscovery::TEvDiscoveryData::TPtr& ev, const NActors::TActorContext& ctx);
void HandleDiscoveryError(NKikimr::TEvDiscovery::TEvError::TPtr& ev);

void AddTopicResponse(TMetadataResponseData::TMetadataResponseTopic& topic, TEvLocationResponse* response);
void AddTopicError(TMetadataResponseData::TMetadataResponseTopic& topic, EKafkaErrors errorCode);
void RespondIfRequired(const NActors::TActorContext& ctx);
void AddProxyNodeToBrokers();
void AddCurrentNodeToBrokers();
void RequestICNodeCache();
void ProcessTopics();

void SendDiscoveryRequest();
bool ProcessDiscoveryData(NKikimr::TEvDiscovery::TEvDiscoveryData::TPtr& ev);
STATEFN(StateWork) {
switch (ev->GetTypeRewrite()) {
HFunc(TEvLocationResponse, HandleResponse);
HFunc(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse, HandleNodesResponse);
HFunc(NKikimr::TEvDiscovery::TEvDiscoveryData, HandleDiscoveryData);
hFunc(NKikimr::TEvDiscovery::TEvError, HandleDiscoveryError);
}
}

Expand All @@ -55,6 +64,12 @@ class TKafkaMetadataActor: public NActors::TActorBootstrapped<TKafkaMetadataActo
THashMap<TActorId, TVector<ui64>> TopicIndexes;
THashSet<ui64> AllClusterNodes;
EKafkaErrors ErrorCode = EKafkaErrors::NONE_ERROR;

TActorId DiscoveryCacheActor;
bool NeedCurrentNode = false;
bool DiscoveryRequested = false;
THashMap<ui64, ui64> Nodes;
TMap<ui64, TEvLocationResponse::TPtr> PendingTopicResponses;
};

} // namespace NKafka
Loading

0 comments on commit 15b0d19

Please sign in to comment.