Skip to content

Commit

Permalink
Add single discovery cache actor
Browse files Browse the repository at this point in the history
  • Loading branch information
FloatingCrowbar committed Jan 10, 2025
1 parent 4863285 commit f29343c
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 30 deletions.
4 changes: 0 additions & 4 deletions ydb/core/discovery/discovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -627,10 +627,6 @@ IActor* CreateDiscoverer(
return new TDiscoverer(f, database, replyTo, cacheId);
}

IActor* CreateDiscoveryCache() {
return new NDiscoveryPrivate::TDiscoveryCache();
}

IActor* CreateDiscoveryCache(const TString& endpointId) {
return new NDiscoveryPrivate::TDiscoveryCache(endpointId);
}
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/discovery/discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ IActor* CreateDiscoverer(
const TActorId& cacheId);

// Used to reduce number of requests to Board
IActor* CreateDiscoveryCache();
IActor* CreateDiscoveryCache(const TString& endpointId);
IActor* CreateDiscoveryCache(const TString& endpointId = {});

}
15 changes: 11 additions & 4 deletions ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include <ydb/core/control/immediate_control_board_actor.h>

#include <ydb/core/driver_lib/version/version.h>
#include <ydb/core/discovery/discovery.h>

#include <ydb/core/grpc_services/grpc_mon.h>
#include <ydb/core/grpc_services/grpc_request_proxy.h>
Expand Down Expand Up @@ -1721,8 +1722,6 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se
desc->Port = kakfaConfig.GetListeningPort();
desc->Ssl = kakfaConfig.HasSslCertificate();

TVector<TString> services = {"datastreams", "pq", "pqv1"};
desc->ServedServices.insert(desc->ServedServices.end(), services.begin(), services.end());
desc->EndpointId = NGRpcService::KafkaEndpointId;
}

Expand Down Expand Up @@ -2774,10 +2773,18 @@ void TKafkaProxyServiceInitializer::InitializeServices(NActors::TActorSystemSetu
settings.CertificateFile = Config.GetKafkaProxyConfig().GetCert();
settings.PrivateKeyFile = Config.GetKafkaProxyConfig().GetKey();

if (Config.GetKafkaProxyConfig().GetEnableEndpointDiscovery()) {
setup->LocalServices.emplace_back(
NKafka::MakeKafkaDiscoveryCacheID(),
TActorSetupCmd(CreateDiscoveryCache(NGRpcService::KafkaEndpointId),
TMailboxType::HTSwap, appData->UserPoolId)
);
}
setup->LocalServices.emplace_back(
TActorId(),
TActorSetupCmd(NKafka::CreateKafkaListener(MakePollerActorId(), settings, Config.GetKafkaProxyConfig()),
TMailboxType::HTSwap, appData->UserPoolId)
TActorSetupCmd(NKafka::CreateKafkaListener(MakePollerActorId(), settings, Config.GetKafkaProxyConfig(),
NKafka::MakeKafkaDiscoveryCacheID()),
TMailboxType::HTSwap, appData->UserPoolId)
);

IActor* metricsActor = CreateKafkaMetricsActor(NKafka::TKafkaMetricsSettings{appData->Counters});
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kafka_proxy/actors/actors.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ struct TContext {

NKikimr::NPQ::TRlContext RlContext;

TActorId DiscoveryCacheActor;

bool Authenticated() { return AuthenticationStep == SUCCESS; }
};

Expand Down
31 changes: 18 additions & 13 deletions ydb/core/kafka_proxy/kafka_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,20 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
TKafkaConnection(const TActorId& listenerActorId,
TIntrusivePtr<TSocketDescriptor> socket,
TNetworkConfig::TSocketAddressType address,
const NKikimrConfig::TKafkaProxyConfig& config)
const NKikimrConfig::TKafkaProxyConfig& config,
const TActorId& discoveryCacheActorId)
: ListenerActorId(listenerActorId)
, Socket(std::move(socket))
, Address(address)
, Buffer(Socket.Get(), config.GetPacketSize())
, Step(SIZE_READ)
, Demand(NoDemand)
, InflightSize(0)
, Context(std::make_shared<TContext>(config)) {
, Context(std::make_shared<TContext>(config))
{
SetNonBlock();
IsSslRequired = Socket->IsSslSupported();
Context->DiscoveryCacheActor = discoveryCacheActorId;
}

void Bootstrap() {
Expand Down Expand Up @@ -318,7 +321,7 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
TMessagePtr<T> Cast(std::shared_ptr<Msg>& request) {
return TMessagePtr<T>(request->Buffer, request->Message);
}

bool ProcessRequest(const TActorContext& ctx) {
KAFKA_LOG_D("process message: ApiKey=" << Request->Header.RequestApiKey << ", ExpectedSize=" << Request->ExpectedSize
<< ", Size=" << Request->Size);
Expand All @@ -339,7 +342,7 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
if (Request->Header.ClientId.has_value() && Request->Header.ClientId != "") {
Context->KafkaClient = Request->Header.ClientId.value();
}

switch (Request->Header.RequestApiKey) {
case PRODUCE:
HandleMessage(&Request->Header, Cast<TProduceRequestData>(Request), ctx);
Expand Down Expand Up @@ -372,7 +375,7 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
case FETCH:
HandleMessage(&Request->Header, Cast<TFetchRequestData>(Request));
break;

case JOIN_GROUP:
HandleMessage(&Request->Header, Cast<TJoinGroupRequestData>(Request), ctx);
break;
Expand All @@ -388,11 +391,11 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
case HEARTBEAT:
HandleMessage(&Request->Header, Cast<THeartbeatRequestData>(Request), ctx);
break;

case FIND_COORDINATOR:
HandleMessage(&Request->Header, Cast<TFindCoordinatorRequestData>(Request));
break;

case OFFSET_FETCH:
HandleMessage(&Request->Header, Cast<TOffsetFetchRequestData>(Request));
break;
Expand Down Expand Up @@ -481,7 +484,7 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
void HandleKillReadSession() {
if (ReadSessionActorId) {
Send(ReadSessionActorId, new TEvents::TEvPoison());

TActorId emptyActor;
ReadSessionActorId = emptyActor;
}
Expand Down Expand Up @@ -714,13 +717,14 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet

bool RequireAuthentication(EApiKey apiKey) {
bool configuredToAuthenticate = NKikimr::AppData()->EnforceUserTokenRequirement;
bool apiKeyRequiresAuthentication = !(EApiKey::API_VERSIONS == apiKey ||
EApiKey::SASL_HANDSHAKE == apiKey ||
bool apiKeyRequiresAuthentication = !(EApiKey::API_VERSIONS == apiKey ||
EApiKey::SASL_HANDSHAKE == apiKey ||
EApiKey::SASL_AUTHENTICATE == apiKey);

return configuredToAuthenticate && apiKeyRequiresAuthentication;
}


void HandleConnected(TEvPollerReady::TPtr event, const TActorContext& ctx) {
if (event->Get()->Read) {
if (!CloseConnection) {
Expand Down Expand Up @@ -784,8 +788,9 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
NActors::IActor* CreateKafkaConnection(const TActorId& listenerActorId,
TIntrusivePtr<TSocketDescriptor> socket,
TNetworkConfig::TSocketAddressType address,
const NKikimrConfig::TKafkaProxyConfig& config) {
return new TKafkaConnection(listenerActorId, std::move(socket), std::move(address), config);
const NKikimrConfig::TKafkaProxyConfig& config,
const TActorId& discoveryCacheActorId) {
return new TKafkaConnection(listenerActorId, std::move(socket), std::move(address), config, discoveryCacheActorId);
}

} // namespace NKafka
3 changes: 2 additions & 1 deletion ydb/core/kafka_proxy/kafka_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ using namespace NKikimr::NRawSocket;
NActors::IActor* CreateKafkaConnection(const TActorId& listenerActorId,
TIntrusivePtr<TSocketDescriptor> socket,
TNetworkConfig::TSocketAddressType address,
const NKikimrConfig::TKafkaProxyConfig& config);
const NKikimrConfig::TKafkaProxyConfig& config,
const TActorId& discoveryCacheActorId);

} // namespace NKafka
13 changes: 11 additions & 2 deletions ydb/core/kafka_proxy/kafka_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,20 @@ namespace NKafka {

using namespace NKikimr::NRawSocket;

inline NActors::IActor* CreateKafkaListener(const NActors::TActorId& poller, const TListenerSettings& settings, const NKikimrConfig::TKafkaProxyConfig& config) {

TActorId MakeKafkaDiscoveryCacheID() {
static const char x[12] = "kafka_dsc_c";
return TActorId(0, TStringBuf(x, 12));
}

inline NActors::IActor* CreateKafkaListener(
const NActors::TActorId& poller, const TListenerSettings& settings, const NKikimrConfig::TKafkaProxyConfig& config,
const TActorId& discoveryCacheActorId
) {
return CreateSocketListener(
poller, settings,
[=](const TActorId& listenerActorId, TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address) {
return CreateKafkaConnection(listenerActorId, socket, address, config);
return CreateKafkaConnection(listenerActorId, socket, address, config, discoveryCacheActorId);
},
NKikimrServices::EServiceKikimr::KAFKA_PROXY, EErrorAction::Abort);
}
Expand Down
6 changes: 2 additions & 4 deletions ydb/core/testlib/test_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,8 @@ namespace Tests {
settings.SslCertificatePem = Settings->AppConfig->GetKafkaProxyConfig().GetSslCertificate();
}

IActor* actor = NKafka::CreateKafkaListener(MakePollerActorId(), settings, Settings->AppConfig->GetKafkaProxyConfig());
IActor* actor = NKafka::CreateKafkaListener(MakePollerActorId(), settings, Settings->AppConfig->GetKafkaProxyConfig(),
TActorId{});
TActorId actorId = Runtime->Register(actor, nodeIdx, userPoolId);
Runtime->RegisterService(TActorId{}, actorId, nodeIdx);

Expand All @@ -1123,9 +1124,6 @@ namespace Tests {
rootDomains.emplace_back("/" + domain->Name);
}
desc->ServedDatabases.insert(desc->ServedDatabases.end(), rootDomains.begin(), rootDomains.end());

TVector<TString> grpcServices = {"datastreams", "pq", "pqv1"};
desc->ServedServices.insert(desc->ServedServices.end(), grpcServices.begin(), grpcServices.end());
Runtime->GetActorSystem(0)->Register(NGRpcService::CreateGrpcEndpointPublishActor(desc.Get()), TMailboxType::ReadAsFilled, appData.UserPoolId);
}
}
Expand Down

0 comments on commit f29343c

Please sign in to comment.