Skip to content
This repository was archived by the owner on Feb 12, 2022. It is now read-only.

Workaround for statistics memory leak #1

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/Kafka/Kafka.Client/Cfg/StatSettings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace Kafka.Client.Cfg
{
public static class StatSettings
{
public static volatile bool ConsumerStatsEnabled = false;

public static volatile bool ProducerStatsEnabled = false;

public static volatile bool FetcherThreadStatsEnabled = false;
}
}
23 changes: 19 additions & 4 deletions src/Kafka/Kafka.Client/Consumers/SimpleConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace Kafka.Client.Consumers
using Kafka.Client.Cfg;

namespace Kafka.Client.Consumers
{
using System;
using System.Collections.Generic;
Expand Down Expand Up @@ -137,9 +139,22 @@ public TopicMetadataResponse Send(TopicMetadataRequest request)
internal FetchResponse Fetch(FetchRequest request)
{
Receive response = null;
var specificTimer = this.fetchRequestAndResponseStats.GetFetchRequestAndResponseStats(this.BrokerInfo).RequestTimer;
var aggregateTimer = this.fetchRequestAndResponseStats.GetFetchRequestAndResponseAllBrokersStats().RequestTimer;
aggregateTimer.Time(() => specificTimer.Time(() => { response = this.SendRequest(request); }));
if (StatSettings.ConsumerStatsEnabled)
{
var specificTimer =
this.fetchRequestAndResponseStats.GetFetchRequestAndResponseStats(this.BrokerInfo).RequestTimer;
var aggregateTimer =
this.fetchRequestAndResponseStats.GetFetchRequestAndResponseAllBrokersStats().RequestTimer;
aggregateTimer.Time(() => specificTimer.Time(() =>
{
response = this.SendRequest(request);

}));
}
else
{
response = this.SendRequest(request);
}

var fetchResponse = FetchResponse.ReadFrom(response.Buffer);
var fetchedSize = fetchResponse.SizeInBytes;
Expand Down
1 change: 1 addition & 0 deletions src/Kafka/Kafka.Client/Kafka.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
<Compile Include="Api\TopicMetadata.cs" />
<Compile Include="Api\TopicMetadataRequest.cs" />
<Compile Include="Api\TopicMetadataResponse.cs" />
<Compile Include="Cfg\StatSettings.cs" />
<Compile Include="Common\Config.cs" />
<Compile Include="Consumers\TopicEventHandler.cs" />
<Compile Include="Extensions\DictionaryExtensions.cs" />
Expand Down
26 changes: 18 additions & 8 deletions src/Kafka/Kafka.Client/Producers/SyncProducer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace Kafka.Client.Producers
using Kafka.Client.Cfg;

namespace Kafka.Client.Producers
{
using System;
using System.IO;
Expand Down Expand Up @@ -92,18 +94,26 @@ public Receive DoSend(RequestOrResponse request, bool readResponse = true)

public ProducerResponse Send(ProducerRequest producerRequest)
{
var requestSize = producerRequest.SizeInBytes;
this.producerRequestStats.GetProducerRequestStats(this.BrokerInfo).RequestSizeHist.Update(requestSize);
this.producerRequestStats.GetProducerRequestAllBrokersStats().RequestSizeHist.Update(requestSize);

Receive response = null;
var specificTimer = this.producerRequestStats.GetProducerRequestStats(this.BrokerInfo).RequestTimer;
var aggregateTimer = this.producerRequestStats.GetProducerRequestAllBrokersStats().RequestTimer;

aggregateTimer.Time(() => specificTimer.Time(() =>
if (StatSettings.ProducerStatsEnabled)
{
var requestSize = producerRequest.SizeInBytes;
this.producerRequestStats.GetProducerRequestStats(this.BrokerInfo).RequestSizeHist.Update(requestSize);
this.producerRequestStats.GetProducerRequestAllBrokersStats().RequestSizeHist.Update(requestSize);

var specificTimer = this.producerRequestStats.GetProducerRequestStats(this.BrokerInfo).RequestTimer;
var aggregateTimer = this.producerRequestStats.GetProducerRequestAllBrokersStats().RequestTimer;

aggregateTimer.Time(() => specificTimer.Time(() =>
{
response = this.DoSend(producerRequest, producerRequest.RequiredAcks != 0);
}));
}
else
{
response = this.DoSend(producerRequest, producerRequest.RequiredAcks != 0);
}

if (producerRequest.RequiredAcks != 0)
{
Expand Down
18 changes: 13 additions & 5 deletions src/Kafka/Kafka.Client/Server/AbstractFetcherThread.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace Kafka.Client.Server
using Kafka.Client.Cfg;

namespace Kafka.Client.Server
{
using System;
using System.Collections.Generic;
Expand Down Expand Up @@ -201,17 +203,23 @@ public void ProcessFetchRequest(FetchRequest fetchRequest)
try
{
var messages = (ByteBufferMessageSet)partitionData.Messages;
var validBytes = messages.ValidBytes;
var messageAndOffset =
messages.ShallowIterator().ToEnumerable().LastOrDefault();
var newOffset = messageAndOffset != null
? messageAndOffset.NextOffset
: currentOffset;

this.partitionMap[topicAndPartition] = newOffset;
this.FetcherLagStats.GetFetcherLagStats(topic, partitionId).Lag = partitionData.Hw
- newOffset;
this.FetcherStats.ByteRate.Mark(validBytes);

if (StatSettings.FetcherThreadStatsEnabled)
{
var validBytes = messages.ValidBytes;

this.FetcherLagStats.GetFetcherLagStats(topic, partitionId).Lag = partitionData.Hw
-
newOffset;
this.FetcherStats.ByteRate.Mark(validBytes);
}

// Once we hand off the partition Data to the subclass, we can't mess with it any more in this thread
this.ProcessPartitionData(topicAndPartition, currentOffset, partitionData);
Expand Down