Skip to content

Commit

Permalink
Improvements after review
Browse files Browse the repository at this point in the history
  • Loading branch information
FloatingCrowbar committed Jan 13, 2025
1 parent 9b57613 commit fd80112
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 28 deletions.
14 changes: 7 additions & 7 deletions ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1723,6 +1723,8 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se
desc->Ssl = kakfaConfig.HasSslCertificate();

desc->EndpointId = NGRpcService::KafkaEndpointId;
endpoints.push_back(std::move(desc));

}

for (auto &sx : config.GetExtEndpoints()) {
Expand Down Expand Up @@ -2773,13 +2775,11 @@ void TKafkaProxyServiceInitializer::InitializeServices(NActors::TActorSystemSetu
settings.CertificateFile = Config.GetKafkaProxyConfig().GetCert();
settings.PrivateKeyFile = Config.GetKafkaProxyConfig().GetKey();

if (Config.GetKafkaProxyConfig().GetEnableEndpointDiscovery()) {
setup->LocalServices.emplace_back(
NKafka::MakeKafkaDiscoveryCacheID(),
TActorSetupCmd(CreateDiscoveryCache(NGRpcService::KafkaEndpointId),
TMailboxType::HTSwap, appData->UserPoolId)
);
}
setup->LocalServices.emplace_back(
NKafka::MakeKafkaDiscoveryCacheID(),
TActorSetupCmd(CreateDiscoveryCache(NGRpcService::KafkaEndpointId),
TMailboxType::HTSwap, appData->UserPoolId)
);
setup->LocalServices.emplace_back(
TActorId(),
TActorSetupCmd(NKafka::CreateKafkaListener(MakePollerActorId(), settings, Config.GetKafkaProxyConfig(),
Expand Down
24 changes: 6 additions & 18 deletions ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ using namespace NKikimr::NGRpcProxy::V1;

NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context,
const ui64 correlationId,
const TMessagePtr<TMetadataRequestData>& message) {
return new TKafkaMetadataActor(context, correlationId, message);
const TMessagePtr<TMetadataRequestData>& message,
const TActorId& discoveryCacheActor) {
return new TKafkaMetadataActor(context, correlationId, message, discoveryCacheActor);
}

void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) {
Expand All @@ -23,10 +24,7 @@ void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) {
if (WithProxy) {
AddProxyNodeToBrokers();
} else {
if (Context->Config.GetEnableEndpointDiscovery())
SendDiscoveryRequest();
else
RequestICNodeCache();
SendDiscoveryRequest();

if (Message->Topics.size() == 0) {
NeedCurrentNode = true;
Expand All @@ -42,10 +40,7 @@ void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) {
}

void TKafkaMetadataActor::SendDiscoveryRequest() {
if (!DiscoveryCacheActor) {
OwnDiscoveryCache = true;
DiscoveryCacheActor = Register(CreateDiscoveryCache(NGRpcService::KafkaEndpointId));
}
Y_VERIFY_DEBUG(DiscoveryCacheActor);
PendingResponses++;
Register(CreateDiscoverer(&MakeEndpointsBoardPath, Context->DatabasePath, SelfId(), DiscoveryCacheActor));
}
Expand Down Expand Up @@ -270,6 +265,7 @@ void TKafkaMetadataActor::RespondIfRequired(const TActorContext& ctx) {
return;
}
AddBroker(nodeIter->first, nodeIter->second.Host, nodeIter->second.Port);
NeedCurrentNode = false;
}
while (!PendingTopicResponses.empty()) {
auto& [index, ev] = *PendingTopicResponses.begin();
Expand All @@ -293,14 +289,6 @@ void TKafkaMetadataActor::RespondIfRequired(const TActorContext& ctx) {
Respond();
}

void TKafkaMetadataActor::Die(const TActorContext& ctx) {
if (OwnDiscoveryCache) {
Send(DiscoveryCacheActor, new TEvents::TEvPoison());
OwnDiscoveryCache = false;
}
TActor::Die(ctx);
}

TString TKafkaMetadataActor::LogPrefix() const {
return TStringBuilder() << "TKafkaMetadataActor " << SelfId() << " ";
}
Expand Down
4 changes: 1 addition & 3 deletions ydb/core/kafka_proxy/actors/kafka_metadata_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace NKafka {
class TKafkaMetadataActor: public NActors::TActorBootstrapped<TKafkaMetadataActor> {
public:
TKafkaMetadataActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TMetadataRequestData>& message,
const TActorId& discoveryCacheActor = {})
const TActorId& discoveryCacheActor)
: Context(context)
, CorrelationId(correlationId)
, Message(message)
Expand Down Expand Up @@ -60,7 +60,6 @@ class TKafkaMetadataActor: public NActors::TActorBootstrapped<TKafkaMetadataActo
}

TString LogPrefix() const;
void Die(const TActorContext& ctx) override;

private:
const TContext::TPtr Context;
Expand All @@ -76,7 +75,6 @@ class TKafkaMetadataActor: public NActors::TActorBootstrapped<TKafkaMetadataActo
EKafkaErrors ErrorCode = EKafkaErrors::NONE_ERROR;

TActorId DiscoveryCacheActor;
bool OwnDiscoveryCache = false;
bool NeedCurrentNode = false;
bool HaveError = false;
bool FallbackToIcDiscovery = false;
Expand Down

0 comments on commit fd80112

Please sign in to comment.