diff --git a/src/Kafka/Kafka.Client/Common/ClientIdAndBroker.cs b/src/Kafka/Kafka.Client/Common/ClientIdAndBroker.cs
index c2d670b..c6d26c3 100644
--- a/src/Kafka/Kafka.Client/Common/ClientIdAndBroker.cs
+++ b/src/Kafka/Kafka.Client/Common/ClientIdAndBroker.cs
@@ -1,24 +1,57 @@
-namespace Kafka.Client.Common
-{
- ///
- /// Convenience case class since (clientId, brokerInfo) pairs are used to create
- /// SyncProducer Request Stats and SimpleConsumer Request and Response Stats.
- ///
- public class ClientIdAndBroker
- {
- public string ClientId { get; private set; }
-
- public string BrokerInfo { get; private set; }
-
- public ClientIdAndBroker(string clientId, string brokerInfo)
- {
- this.ClientId = clientId;
- this.BrokerInfo = brokerInfo;
- }
-
- public override string ToString()
- {
- return string.Format("{0}-{1}", this.ClientId, this.BrokerInfo);
- }
- }
+namespace Kafka.Client.Common
+{
+ ///
+ /// Convenience case class since (clientId, brokerInfo) pairs are used to create
+ /// SyncProducer Request Stats and SimpleConsumer Request and Response Stats.
+ ///
+ public class ClientIdAndBroker
+ {
+ public string ClientId { get; private set; }
+
+ public string BrokerInfo { get; private set; }
+
+ public ClientIdAndBroker(string clientId, string brokerInfo)
+ {
+ this.ClientId = clientId;
+ this.BrokerInfo = brokerInfo;
+ }
+
+ public override string ToString()
+ {
+ return string.Format("{0}-{1}", this.ClientId, this.BrokerInfo);
+ }
+
+ protected bool Equals(ClientIdAndBroker other)
+ {
+ return this.ClientId == other.ClientId && this.BrokerInfo == other.BrokerInfo;
+ }
+
+ public override bool Equals(object obj)
+ {
+ if (ReferenceEquals(null, obj))
+ {
+ return false;
+ }
+
+ if (ReferenceEquals(this, obj))
+ {
+ return true;
+ }
+
+ if (obj.GetType() != this.GetType())
+ {
+ return false;
+ }
+
+ return this.Equals((ClientIdAndBroker)obj);
+ }
+
+ public override int GetHashCode()
+ {
+ unchecked
+ {
+ return ((this.ClientId != null ? this.ClientId.GetHashCode() : 0) * 397) ^ (this.BrokerInfo != null ? this.BrokerInfo.GetHashCode() : 0);
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/src/Kafka/Kafka.Client/Server/AbstractFetcherThread.cs b/src/Kafka/Kafka.Client/Server/AbstractFetcherThread.cs
index b215733..90a4a66 100644
--- a/src/Kafka/Kafka.Client/Server/AbstractFetcherThread.cs
+++ b/src/Kafka/Kafka.Client/Server/AbstractFetcherThread.cs
@@ -1,432 +1,473 @@
-namespace Kafka.Client.Server
-{
- using System;
- using System.Collections.Generic;
- using System.Linq;
-
- using Kafka.Client.Api;
- using Kafka.Client.Clusters;
- using Kafka.Client.Common;
- using Kafka.Client.Common.Imported;
- using Kafka.Client.Consumers;
- using Kafka.Client.Extensions;
- using Kafka.Client.Messages;
- using Kafka.Client.Utils;
-
- using Spring.Threading.Locks;
-
- internal abstract class AbstractFetcherThread : ShutdownableThread
- {
- private string clientId;
-
- private Broker sourceBroker;
-
- private int socketTimeout;
-
- private int socketBufferSize;
-
- private int fetchSize;
-
- private int fetcherBrokerId;
-
- private int maxWait;
-
- private int minBytes;
-
- private readonly IDictionary partitionMap = new Dictionary();
-
- private readonly ReentrantLock partitionMapLock;
-
- private readonly ICondition partitionMapCond;
-
- protected readonly SimpleConsumer simpleConsumer;
-
- private readonly string brokerInfo;
-
- private readonly ClientIdAndBroker metricId;
-
- public FetcherStats FetcherStats { get; private set; }
-
- public FetcherLagStats FetcherLagStats { get; private set; }
-
- private readonly FetchRequestBuilder fetchRequestBuilder;
-
- internal AbstractFetcherThread(
- string name,
- string clientId,
- Broker sourceBroker,
- int socketTimeout,
- int socketBufferSize,
- int fetchSize,
- int fetcherBrokerId = -1,
- int maxWait = 0,
- int minBytes = 1,
- bool isInterruptible = true)
- : base(name, isInterruptible)
- {
- this.clientId = clientId;
- this.sourceBroker = sourceBroker;
- this.socketTimeout = socketTimeout;
- this.socketBufferSize = socketBufferSize;
- this.fetchSize = fetchSize;
- this.fetcherBrokerId = fetcherBrokerId;
- this.maxWait = maxWait;
- this.minBytes = minBytes;
-
- this.partitionMapLock = new ReentrantLock();
- this.partitionMapCond = this.partitionMapLock.NewCondition();
- this.simpleConsumer = new SimpleConsumer(
- sourceBroker.Host, sourceBroker.Port, socketTimeout, socketBufferSize, clientId);
- this.brokerInfo = string.Format("host_{0}-port_{1}", sourceBroker.Host, sourceBroker.Port);
-
- this.metricId = new ClientIdAndBroker(clientId, this.brokerInfo);
-
- this.FetcherStats = new FetcherStats(this.metricId);
- this.FetcherLagStats = new FetcherLagStats(this.metricId);
- this.fetchRequestBuilder =
- new FetchRequestBuilder().ClientId(clientId)
- .ReplicaId(fetcherBrokerId)
- .MaxWait(maxWait)
- .MinBytes(minBytes);
- }
-
- ///
- /// process fetched Data
- ///
- ///
- ///
- ///
- public abstract void ProcessPartitionData(
- TopicAndPartition topicAndPartition, long fetchOffset, FetchResponsePartitionData partitionData);
-
- ///
- /// handle a partition whose offset is out of range and return a new fetch offset
- ///
- ///
- ///
- public abstract long HandleOffsetOutOfRange(TopicAndPartition topicAndPartition);
-
- ///
- /// deal with partitions with errors, potentially due to leadership changes
- ///
- ///
- public abstract void HandlePartitionsWithErrors(IEnumerable partitions);
-
- public override void Shutdown()
- {
- base.Shutdown();
- this.simpleConsumer.Close();
- }
-
- public override void DoWork()
- {
- this.partitionMapLock.Lock();
- try
- {
- if (this.partitionMap.Count == 0)
- {
- this.partitionMapCond.Await(TimeSpan.FromMilliseconds(200));
- }
-
- foreach (var topicAndOffset in this.partitionMap)
- {
- var topicAndPartition = topicAndOffset.Key;
- var offset = topicAndOffset.Value;
- this.fetchRequestBuilder.AddFetch(topicAndPartition.Topic, topicAndPartition.Partiton, offset, this.fetchSize);
- }
- }
- finally
- {
- this.partitionMapLock.Unlock();
- }
-
- var fetchRequest = this.fetchRequestBuilder.Build();
- if (fetchRequest.RequestInfo.Count > 0)
- {
- this.ProcessFetchRequest(fetchRequest);
- }
- }
-
- public void ProcessFetchRequest(FetchRequest fetchRequest)
- {
- var partitionsWithError = new HashSet();
- FetchResponse response = null;
- try
- {
- Logger.DebugFormat("issuing to broker {0} of fetch request {1}", this.sourceBroker.Id, fetchRequest);
- response = this.simpleConsumer.Fetch(fetchRequest);
- }
- catch (Exception e)
- {
- if (isRunning.Get())
- {
- Logger.Error("Error in fetch " + fetchRequest, e);
- this.partitionMapLock.Lock();
- try
- {
- foreach (var key in this.partitionMap.Keys)
- {
- partitionsWithError.Add(key);
- }
- }
- finally
- {
- this.partitionMapLock.Unlock();
- }
- }
- }
-
- this.FetcherStats.RequestRate.Mark();
-
- if (response != null)
- {
- // process fetched Data
- this.partitionMapLock.Lock();
- try
- {
- foreach (var topicAndData in response.Data)
- {
- var topicAndPartition = topicAndData.Key;
- var partitionData = topicAndData.Value;
- var topic = topicAndPartition.Topic;
- var partitionId = topicAndPartition.Partiton;
- long currentOffset;
- if (this.partitionMap.TryGetValue(topicAndPartition, out currentOffset)
- && fetchRequest.RequestInfo[topicAndPartition].Offset == currentOffset)
- {
- // we append to the log if the current offset is defined and it is the same as the offset requested during fetch
- switch (partitionData.Error)
- {
- case ErrorMapping.NoError:
- try
- {
- var messages = (ByteBufferMessageSet)partitionData.Messages;
- var validBytes = messages.ValidBytes;
- var messageAndOffset =
- messages.ShallowIterator().ToEnumerable().LastOrDefault();
- var newOffset = messageAndOffset != null
- ? messageAndOffset.NextOffset
- : currentOffset;
-
- this.partitionMap[topicAndPartition] = newOffset;
- this.FetcherLagStats.GetFetcherLagStats(topic, partitionId).Lag = partitionData.Hw
- - newOffset;
- this.FetcherStats.ByteRate.Mark(validBytes);
-
- // Once we hand off the partition Data to the subclass, we can't mess with it any more in this thread
- this.ProcessPartitionData(topicAndPartition, currentOffset, partitionData);
- }
- catch (InvalidMessageException ime)
- {
- // we log the error and continue. This ensures two things
- // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag
- // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
- // should get fixed in the subsequent fetches
- Logger.ErrorFormat(
- "Found invalid messages during fetch for partiton [{0},{1}] offset {2} error {3}",
- topic,
- partitionId,
- currentOffset,
- ime.Message);
- }
- catch (Exception e)
- {
- throw new KafkaException(
- string.Format(
- "error processing Data for partition [{0},{1}] offset {2}",
- topic,
- partitionId,
- currentOffset),
- e);
- }
-
- break;
- case ErrorMapping.OffsetOutOfRangeCode:
- try
- {
- var newOffset = this.HandleOffsetOutOfRange(topicAndPartition);
- this.partitionMap[topicAndPartition] = newOffset;
- Logger.ErrorFormat(
- "Current offset {0} for partiton [{1},{2}] out of range; reste offset to {3}",
- currentOffset,
- topic,
- partitionId,
- newOffset);
- }
- catch (Exception e)
- {
- Logger.Error(
- string.Format(
- "Error getting offset for partiton [{0},{1}] to broker {2}",
- topic,
- partitionId,
- sourceBroker.Id),
- e);
- partitionsWithError.Add(topicAndPartition);
- }
-
- break;
- default:
- if (isRunning.Get())
- {
- Logger.ErrorFormat(
- "Error for partition [{0},{1}] to broker {2}:{3}",
- topic,
- partitionId,
- this.sourceBroker.Id,
- ErrorMapping.ExceptionFor(partitionData.Error).GetType().Name);
- partitionsWithError.Add(topicAndPartition);
- }
-
- break;
- }
- }
- }
- }
- finally
- {
- this.partitionMapLock.Unlock();
- }
- }
-
- if (partitionsWithError.Count > 0)
- {
- Logger.DebugFormat("handling partitions with error for {0}", string.Join(",", partitionsWithError));
- this.HandlePartitionsWithErrors(partitionsWithError);
- }
- }
-
- public void AddPartitions(IDictionary partitionAndOffsets)
- {
- this.partitionMapLock.LockInterruptibly();
- try
- {
- foreach (var topicAndOffset in partitionAndOffsets)
- {
- var topicAndPartition = topicAndOffset.Key;
- var offset = topicAndOffset.Value;
-
- // If the partitionMap already has the topic/partition, then do not update the map with the old offset
- if (!this.partitionMap.ContainsKey(topicAndPartition))
- {
- this.partitionMap[topicAndPartition] = PartitionTopicInfo.IsOffsetInvalid(offset)
- ? this.HandleOffsetOutOfRange(topicAndPartition)
- : offset;
- }
-
- this.partitionMapCond.SignalAll();
- }
- }
- finally
- {
- this.partitionMapLock.Unlock();
- }
- }
-
- public void RemovePartitions(ISet topicAndPartitions)
- {
- this.partitionMapLock.LockInterruptibly();
- try
- {
- foreach (var tp in topicAndPartitions)
- {
- this.partitionMap.Remove(tp);
- }
- }
- finally
- {
- this.partitionMapLock.Unlock();
- }
- }
-
- public int PartitionCount()
- {
- this.partitionMapLock.LockInterruptibly();
- try
- {
- return this.partitionMap.Count;
- }
- finally
- {
- this.partitionMapLock.Unlock();
- }
- }
- }
-
- internal class FetcherLagMetrics
- {
- private readonly AtomicLong lagVal = new AtomicLong(-1);
-
- public FetcherLagMetrics(ClientIdBrokerTopicPartition metricId)
- {
- MetersFactory.NewGauge(metricId + "-ConsumerLag", () => this.lagVal.Get());
- }
-
- internal long Lag
- {
- get
- {
- return this.lagVal.Get();
- }
-
- set
- {
- this.lagVal.Set(value);
- }
- }
- }
-
- internal class FetcherLagStats
- {
- private readonly ClientIdAndBroker metricId;
-
- private readonly Func valueFactory;
-
- public Pool Stats { get; private set; }
-
- public FetcherLagStats(ClientIdAndBroker metricId)
- {
- this.metricId = metricId;
- this.valueFactory = k => new FetcherLagMetrics(k);
- this.Stats = new Pool(this.valueFactory);
- }
-
- internal FetcherLagMetrics GetFetcherLagStats(string topic, int partitionId)
- {
- return this.Stats.GetAndMaybePut(
- new ClientIdBrokerTopicPartition(this.metricId.ClientId, this.metricId.BrokerInfo, topic, partitionId));
- }
- }
-
- internal class FetcherStats
- {
- public FetcherStats(ClientIdAndBroker metricId)
- {
- this.RequestRate = MetersFactory.NewMeter(metricId + "-RequestsPerSec", "requests", TimeSpan.FromSeconds(1));
- this.ByteRate = MetersFactory.NewMeter(metricId + "-BytesPerSec", "bytes", TimeSpan.FromSeconds(1));
- }
-
- internal IMeter RequestRate { get; private set; }
-
- internal IMeter ByteRate { get; private set; }
- }
-
- internal class ClientIdBrokerTopicPartition
- {
- public string ClientId { get; private set; }
-
- public string BrokerInfo { get; private set; }
-
- public string Topic { get; private set; }
-
- public int PartitonId { get; private set; }
-
- public ClientIdBrokerTopicPartition(string clientId, string brokerInfo, string topic, int partitonId)
- {
- this.ClientId = clientId;
- this.BrokerInfo = brokerInfo;
- this.Topic = topic;
- this.PartitonId = partitonId;
- }
- }
+namespace Kafka.Client.Server
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Linq;
+
+ using Kafka.Client.Api;
+ using Kafka.Client.Clusters;
+ using Kafka.Client.Common;
+ using Kafka.Client.Common.Imported;
+ using Kafka.Client.Consumers;
+ using Kafka.Client.Extensions;
+ using Kafka.Client.Messages;
+ using Kafka.Client.Utils;
+
+ using Spring.Threading.Locks;
+
+ internal abstract class AbstractFetcherThread : ShutdownableThread
+ {
+ private string clientId;
+
+ private Broker sourceBroker;
+
+ private int socketTimeout;
+
+ private int socketBufferSize;
+
+ private int fetchSize;
+
+ private int fetcherBrokerId;
+
+ private int maxWait;
+
+ private int minBytes;
+
+ private readonly IDictionary partitionMap = new Dictionary();
+
+ private readonly ReentrantLock partitionMapLock;
+
+ private readonly ICondition partitionMapCond;
+
+ protected readonly SimpleConsumer simpleConsumer;
+
+ private readonly string brokerInfo;
+
+ private readonly ClientIdAndBroker metricId;
+
+ public FetcherStats FetcherStats { get; private set; }
+
+ public FetcherLagStats FetcherLagStats { get; private set; }
+
+ private readonly FetchRequestBuilder fetchRequestBuilder;
+
+ internal AbstractFetcherThread(
+ string name,
+ string clientId,
+ Broker sourceBroker,
+ int socketTimeout,
+ int socketBufferSize,
+ int fetchSize,
+ int fetcherBrokerId = -1,
+ int maxWait = 0,
+ int minBytes = 1,
+ bool isInterruptible = true)
+ : base(name, isInterruptible)
+ {
+ this.clientId = clientId;
+ this.sourceBroker = sourceBroker;
+ this.socketTimeout = socketTimeout;
+ this.socketBufferSize = socketBufferSize;
+ this.fetchSize = fetchSize;
+ this.fetcherBrokerId = fetcherBrokerId;
+ this.maxWait = maxWait;
+ this.minBytes = minBytes;
+
+ this.partitionMapLock = new ReentrantLock();
+ this.partitionMapCond = this.partitionMapLock.NewCondition();
+ this.simpleConsumer = new SimpleConsumer(
+ sourceBroker.Host, sourceBroker.Port, socketTimeout, socketBufferSize, clientId);
+ this.brokerInfo = string.Format("host_{0}-port_{1}", sourceBroker.Host, sourceBroker.Port);
+
+ this.metricId = new ClientIdAndBroker(clientId, this.brokerInfo);
+
+ this.FetcherStats = new FetcherStats(this.metricId);
+ this.FetcherLagStats = new FetcherLagStats(this.metricId);
+ this.fetchRequestBuilder =
+ new FetchRequestBuilder().ClientId(clientId)
+ .ReplicaId(fetcherBrokerId)
+ .MaxWait(maxWait)
+ .MinBytes(minBytes);
+ }
+
+ ///
+ /// process fetched Data
+ ///
+ ///
+ ///
+ ///
+ public abstract void ProcessPartitionData(
+ TopicAndPartition topicAndPartition, long fetchOffset, FetchResponsePartitionData partitionData);
+
+ ///
+ /// handle a partition whose offset is out of range and return a new fetch offset
+ ///
+ ///
+ ///
+ public abstract long HandleOffsetOutOfRange(TopicAndPartition topicAndPartition);
+
+ ///
+ /// deal with partitions with errors, potentially due to leadership changes
+ ///
+ ///
+ public abstract void HandlePartitionsWithErrors(IEnumerable partitions);
+
+ public override void Shutdown()
+ {
+ base.Shutdown();
+ this.simpleConsumer.Close();
+ }
+
+ public override void DoWork()
+ {
+ this.partitionMapLock.Lock();
+ try
+ {
+ if (this.partitionMap.Count == 0)
+ {
+ this.partitionMapCond.Await(TimeSpan.FromMilliseconds(200));
+ }
+
+ foreach (var topicAndOffset in this.partitionMap)
+ {
+ var topicAndPartition = topicAndOffset.Key;
+ var offset = topicAndOffset.Value;
+ this.fetchRequestBuilder.AddFetch(topicAndPartition.Topic, topicAndPartition.Partiton, offset, this.fetchSize);
+ }
+ }
+ finally
+ {
+ this.partitionMapLock.Unlock();
+ }
+
+ var fetchRequest = this.fetchRequestBuilder.Build();
+ if (fetchRequest.RequestInfo.Count > 0)
+ {
+ this.ProcessFetchRequest(fetchRequest);
+ }
+ }
+
+ public void ProcessFetchRequest(FetchRequest fetchRequest)
+ {
+ var partitionsWithError = new HashSet();
+ FetchResponse response = null;
+ try
+ {
+ Logger.DebugFormat("issuing to broker {0} of fetch request {1}", this.sourceBroker.Id, fetchRequest);
+ response = this.simpleConsumer.Fetch(fetchRequest);
+ }
+ catch (Exception e)
+ {
+ if (isRunning.Get())
+ {
+ Logger.Error("Error in fetch " + fetchRequest, e);
+ this.partitionMapLock.Lock();
+ try
+ {
+ foreach (var key in this.partitionMap.Keys)
+ {
+ partitionsWithError.Add(key);
+ }
+ }
+ finally
+ {
+ this.partitionMapLock.Unlock();
+ }
+ }
+ }
+
+ this.FetcherStats.RequestRate.Mark();
+
+ if (response != null)
+ {
+ // process fetched Data
+ this.partitionMapLock.Lock();
+ try
+ {
+ foreach (var topicAndData in response.Data)
+ {
+ var topicAndPartition = topicAndData.Key;
+ var partitionData = topicAndData.Value;
+ var topic = topicAndPartition.Topic;
+ var partitionId = topicAndPartition.Partiton;
+ long currentOffset;
+ if (this.partitionMap.TryGetValue(topicAndPartition, out currentOffset)
+ && fetchRequest.RequestInfo[topicAndPartition].Offset == currentOffset)
+ {
+ // we append to the log if the current offset is defined and it is the same as the offset requested during fetch
+ switch (partitionData.Error)
+ {
+ case ErrorMapping.NoError:
+ try
+ {
+ var messages = (ByteBufferMessageSet)partitionData.Messages;
+ var validBytes = messages.ValidBytes;
+ var messageAndOffset =
+ messages.ShallowIterator().ToEnumerable().LastOrDefault();
+ var newOffset = messageAndOffset != null
+ ? messageAndOffset.NextOffset
+ : currentOffset;
+
+ this.partitionMap[topicAndPartition] = newOffset;
+ this.FetcherLagStats.GetFetcherLagStats(topic, partitionId).Lag = partitionData.Hw
+ - newOffset;
+ this.FetcherStats.ByteRate.Mark(validBytes);
+
+ // Once we hand off the partition Data to the subclass, we can't mess with it any more in this thread
+ this.ProcessPartitionData(topicAndPartition, currentOffset, partitionData);
+ }
+ catch (InvalidMessageException ime)
+ {
+ // we log the error and continue. This ensures two things
+ // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag
+ // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
+ // should get fixed in the subsequent fetches
+ Logger.ErrorFormat(
+ "Found invalid messages during fetch for partiton [{0},{1}] offset {2} error {3}",
+ topic,
+ partitionId,
+ currentOffset,
+ ime.Message);
+ }
+ catch (Exception e)
+ {
+ throw new KafkaException(
+ string.Format(
+ "error processing Data for partition [{0},{1}] offset {2}",
+ topic,
+ partitionId,
+ currentOffset),
+ e);
+ }
+
+ break;
+ case ErrorMapping.OffsetOutOfRangeCode:
+ try
+ {
+ var newOffset = this.HandleOffsetOutOfRange(topicAndPartition);
+ this.partitionMap[topicAndPartition] = newOffset;
+ Logger.ErrorFormat(
+ "Current offset {0} for partiton [{1},{2}] out of range; reste offset to {3}",
+ currentOffset,
+ topic,
+ partitionId,
+ newOffset);
+ }
+ catch (Exception e)
+ {
+ Logger.Error(
+ string.Format(
+ "Error getting offset for partiton [{0},{1}] to broker {2}",
+ topic,
+ partitionId,
+ sourceBroker.Id),
+ e);
+ partitionsWithError.Add(topicAndPartition);
+ }
+
+ break;
+ default:
+ if (isRunning.Get())
+ {
+ Logger.ErrorFormat(
+ "Error for partition [{0},{1}] to broker {2}:{3}",
+ topic,
+ partitionId,
+ this.sourceBroker.Id,
+ ErrorMapping.ExceptionFor(partitionData.Error).GetType().Name);
+ partitionsWithError.Add(topicAndPartition);
+ }
+
+ break;
+ }
+ }
+ }
+ }
+ finally
+ {
+ this.partitionMapLock.Unlock();
+ }
+ }
+
+ if (partitionsWithError.Count > 0)
+ {
+ Logger.DebugFormat("handling partitions with error for {0}", string.Join(",", partitionsWithError));
+ this.HandlePartitionsWithErrors(partitionsWithError);
+ }
+ }
+
+ public void AddPartitions(IDictionary partitionAndOffsets)
+ {
+ this.partitionMapLock.LockInterruptibly();
+ try
+ {
+ foreach (var topicAndOffset in partitionAndOffsets)
+ {
+ var topicAndPartition = topicAndOffset.Key;
+ var offset = topicAndOffset.Value;
+
+ // If the partitionMap already has the topic/partition, then do not update the map with the old offset
+ if (!this.partitionMap.ContainsKey(topicAndPartition))
+ {
+ this.partitionMap[topicAndPartition] = PartitionTopicInfo.IsOffsetInvalid(offset)
+ ? this.HandleOffsetOutOfRange(topicAndPartition)
+ : offset;
+ }
+
+ this.partitionMapCond.SignalAll();
+ }
+ }
+ finally
+ {
+ this.partitionMapLock.Unlock();
+ }
+ }
+
+ public void RemovePartitions(ISet topicAndPartitions)
+ {
+ this.partitionMapLock.LockInterruptibly();
+ try
+ {
+ foreach (var tp in topicAndPartitions)
+ {
+ this.partitionMap.Remove(tp);
+ }
+ }
+ finally
+ {
+ this.partitionMapLock.Unlock();
+ }
+ }
+
+ public int PartitionCount()
+ {
+ this.partitionMapLock.LockInterruptibly();
+ try
+ {
+ return this.partitionMap.Count;
+ }
+ finally
+ {
+ this.partitionMapLock.Unlock();
+ }
+ }
+ }
+
+ internal class FetcherLagMetrics
+ {
+ private readonly AtomicLong lagVal = new AtomicLong(-1);
+
+ public FetcherLagMetrics(ClientIdBrokerTopicPartition metricId)
+ {
+ MetersFactory.NewGauge(metricId + "-ConsumerLag", () => this.lagVal.Get());
+ }
+
+ internal long Lag
+ {
+ get
+ {
+ return this.lagVal.Get();
+ }
+
+ set
+ {
+ this.lagVal.Set(value);
+ }
+ }
+ }
+
+ internal class FetcherLagStats
+ {
+ private readonly ClientIdAndBroker metricId;
+
+ private readonly Func valueFactory;
+
+ public Pool Stats { get; private set; }
+
+ public FetcherLagStats(ClientIdAndBroker metricId)
+ {
+ this.metricId = metricId;
+ this.valueFactory = k => new FetcherLagMetrics(k);
+ this.Stats = new Pool(this.valueFactory);
+ }
+
+ internal FetcherLagMetrics GetFetcherLagStats(string topic, int partitionId)
+ {
+ return this.Stats.GetAndMaybePut(
+ new ClientIdBrokerTopicPartition(this.metricId.ClientId, this.metricId.BrokerInfo, topic, partitionId));
+ }
+ }
+
+ internal class FetcherStats
+ {
+ public FetcherStats(ClientIdAndBroker metricId)
+ {
+ this.RequestRate = MetersFactory.NewMeter(metricId + "-RequestsPerSec", "requests", TimeSpan.FromSeconds(1));
+ this.ByteRate = MetersFactory.NewMeter(metricId + "-BytesPerSec", "bytes", TimeSpan.FromSeconds(1));
+ }
+
+ internal IMeter RequestRate { get; private set; }
+
+ internal IMeter ByteRate { get; private set; }
+ }
+
+ internal class ClientIdBrokerTopicPartition
+ {
+ public string ClientId { get; private set; }
+
+ public string BrokerInfo { get; private set; }
+
+ public string Topic { get; private set; }
+
+ public int PartitonId { get; private set; }
+
+ public ClientIdBrokerTopicPartition(string clientId, string brokerInfo, string topic, int partitonId)
+ {
+ this.ClientId = clientId;
+ this.BrokerInfo = brokerInfo;
+ this.Topic = topic;
+ this.PartitonId = partitonId;
+ }
+
+ public override string ToString()
+ {
+ return string.Format("{0}-{1}-{2}-{3}", this.ClientId, this.BrokerInfo, this.Topic, this.PartitonId);
+ }
+
+ protected bool Equals(ClientIdBrokerTopicPartition other)
+ {
+ return this.ClientId == other.ClientId && this.BrokerInfo == other.BrokerInfo && this.Topic == other.Topic && this.PartitonId == other.PartitonId;
+ }
+
+ public override bool Equals(object obj)
+ {
+ if (ReferenceEquals(null, obj))
+ {
+ return false;
+ }
+
+ if (ReferenceEquals(this, obj))
+ {
+ return true;
+ }
+
+ if (obj.GetType() != this.GetType())
+ {
+ return false;
+ }
+
+ return this.Equals((ClientIdBrokerTopicPartition)obj);
+ }
+
+ public override int GetHashCode()
+ {
+ unchecked
+ {
+ return ((this.ClientId != null ? this.ClientId.GetHashCode() : 0) * 397) ^
+ (this.BrokerInfo != null ? this.BrokerInfo.GetHashCode() * 164 : 0) ^
+ (this.Topic != null ? this.Topic.GetHashCode() * 62 : 0) ^
+ (this.PartitonId.GetHashCode());
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/src/Kafka/Kafka.Client/Utils/ZkUtils.cs b/src/Kafka/Kafka.Client/Utils/ZkUtils.cs
index ac040d3..0b34b5c 100644
--- a/src/Kafka/Kafka.Client/Utils/ZkUtils.cs
+++ b/src/Kafka/Kafka.Client/Utils/ZkUtils.cs
@@ -1,665 +1,665 @@
-namespace Kafka.Client.Utils
-{
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Reflection;
- using System.Text;
- using System.Threading;
-
- using Kafka.Client.Clusters;
- using Kafka.Client.Consumers;
- using Kafka.Client.Extensions;
- using Kafka.Client.ZKClient;
- using Kafka.Client.ZKClient.Exceptions;
- using Kafka.Client.ZKClient.Serialize;
-
- using Spring.Threading.Locks;
-
- using log4net;
-
- using Newtonsoft.Json;
- using Newtonsoft.Json.Linq;
-
- using Org.Apache.Zookeeper.Data;
-
- public static class ZkUtils
- {
- private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
-
- public const string ConsumersPath = "/consumers";
-
- public const string BrokerIdsPath = "/brokers/ids";
-
- public const string BrokerTopicsPath = "/brokers/topics";
-
- public const string TopicConfigPath = "/config/topics";
-
- public const string TopicConfigChangesPath = "/config/changes";
-
- public const string ControllerPath = "/controller";
-
- public const string ControllerEpochPath = "/controller_epoch";
-
- public const string ReassignPartitionsPath = "/admin/reassign_partitions";
-
- public const string DeleteTopicsPath = "/admin/delete_topics";
-
- public const string PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election";
-
- public static string GetTopicPath(string topic)
- {
- return BrokerTopicsPath + "/" + topic;
- }
-
- public static string GetTopicPartitionsPath(string topic)
- {
- return GetTopicPath(topic) + "/partitions";
- }
-
- public static string GetTopicConfigPath(string topic)
- {
- return TopicConfigPath + "/" + topic;
- }
-
- public static string GetDeleteTopicPath(string topic)
- {
- return DeleteTopicsPath + "/" + topic;
- }
-
- public static string GetTopicPartitionPath(string topic, int partitionId)
- {
- return GetTopicPartitionsPath(topic) + "/" + partitionId;
- }
-
- public static string GetTopicPartitionLeaderAndIsrPath(string topic, int partitionId)
- {
- return GetTopicPartitionPath(topic, partitionId) + "/" + "state";
- }
-
- public static List GetSortedBrokerList(ZkClient zkClient)
- {
- return GetChildren(zkClient, BrokerIdsPath).Select(int.Parse).OrderBy(x => x).ToList();
- }
-
- public static List GetAllBrokersInCluster(ZkClient zkClient)
- {
- var brokerIds = GetChildrenParentMayNotExist(zkClient, BrokerIdsPath).OrderBy(x => x).ToList();
- return
- brokerIds.Select(int.Parse)
- .Select(id => GetBrokerInfo(zkClient, id))
- .Where(x => x != null)
- .ToList();
- }
-
- public static int? GetLeaderForPartition(ZkClient zkClient, string topic, int partition)
- {
- var leaderAndIsrOpt = ReadDataMaybeNull(zkClient, GetTopicPartitionLeaderAndIsrPath(topic, partition)).Item1;
- if (leaderAndIsrOpt != null)
- {
- return JObject.Parse(leaderAndIsrOpt).SelectToken("leader").Value();
- }
-
- return null;
- }
-
- public static string GetConsumerPartitionOwnerPath(string group, string topic, int partition)
- {
- var topicDirs = new ZKGroupTopicDirs(group, topic);
- return topicDirs.ConsumerOwnerDir + "/" + partition;
- }
-
- ///
- /// Get JSON partition to replica map from zookeeper.
- ///
- ///
- ///
- public static string ReplicaAssignmentZkData(Dictionary> map)
- {
- return JObject.FromObject(new { version = 1, partitions = map }).ToString();
- }
-
-
- ///
- /// make sure a persistent path exists in ZK. Create the path if not exist.
- ///
- ///
- ///
- public static void MakeSurePersistentPathExists(ZkClient client, string path)
- {
- if (!client.Exists(path))
- {
- client.CreatePersistent(path, true); // won't throw NoNodeException or NodeExistsException
- }
- }
-
- ///
- /// create the parent path
- ///
- ///
- ///
- private static void CreateParentPath(ZkClient client, string path)
- {
- var parentDir = path.Substring(0, path.LastIndexOf('/'));
- if (parentDir.Length != 0)
- {
- client.CreatePersistent(parentDir, true);
- }
- }
-
- ///
- /// Create an ephemeral node with the given path and data. Create parents if necessary.
- ///
- ///
- ///
- ///
- private static void CreateEphemeralPath(ZkClient client, string path, string data)
- {
- try
- {
- client.CreateEphemeral(path, data);
- }
- catch (ZkNoNodeException)
- {
- CreateParentPath(client, path);
- client.CreateEphemeral(path, data);
- }
- }
-
- ///
- /// Create an ephemeral node with the given path and data.
- /// Throw NodeExistException if node already exists.
- ///
- ///
- ///
- ///
- public static void CreateEphemeralPathExpectConflict(ZkClient client, string path, string data)
- {
- try
- {
- CreateEphemeralPath(client, path, data);
- }
- catch (ZkNodeExistsException)
- {
- // this can happen when there is connection loss; make sure the Data is what we intend to write
- string storedData = null;
- try
- {
- storedData = ReadData(client, path).Item1;
- }
- catch (ZkNoNodeException)
- {
- // the node disappeared; treat as if node existed and let caller handles this
- }
-
- if (storedData == null || storedData != data)
- {
- Logger.InfoFormat("Conflict in {0} Data: {1}, stored Data: {2}", path, data, storedData);
- throw;
- }
- else
- {
- // otherwise, the creation succeeded, return normally
- Logger.InfoFormat("{0} exists with value {1} during connection loss", path, data);
- }
- }
- }
-
- ///
- /// Create an ephemeral node with the given path and data.
- /// Throw NodeExistsException if node already exists.
- /// Handles the following ZK session timeout b_u_g
- ///
- /// https://issues.apache.org/jira/browse/ZOOKEEPER-1740
- ///
- /// Upon receiving a NodeExistsException, read the data from the conflicted path and
- /// trigger the checker function comparing the read data and the expected data,
- /// If the checker function returns true then the above b_u_g might be encountered, back off and retry;
- /// otherwise re-throw the exception
- ///
- ///
- ///
- ///
- ///
- ///
- ///
- public static void CreateEphemeralPathExpectConflictHandleZKBug(
- ZkClient zkClient,
- string path,
- string data,
- object expectedCallerData,
- Func checker,
- int backoffTime)
- {
- while (true)
- {
- try
- {
- CreateEphemeralPathExpectConflict(zkClient, path, data);
- return;
- }
- catch (ZkNodeExistsException)
- {
- // An ephemeral node may still exist even after its corresponding session has expired
- // due to a Zookeeper ug, in this case we need to retry writing until the previous node is deleted
- // and hence the write succeeds without ZkNodeExistsException
- var writtenData = ReadDataMaybeNull(zkClient, path).Item1;
- if (writtenData != null)
- {
- if (checker(writtenData, expectedCallerData))
- {
- Logger.InfoFormat(
- "I wrote this conflicted ephemeral node [{0}] at {1} a while back in a different session, "
- + "hence I will backoff for this node to be deleted by Zookeeper and retry",
- data,
- path);
-
- Thread.Sleep(backoffTime);
- }
- else
- {
- throw;
- }
- }
- else
- {
- // the node disappeared; retry creating the ephemeral node immediately
- }
- }
- }
- }
-
- ///
- /// Create an persistent node with the given path and data. Create parents if necessary.
- ///
- public static void CreatePersistentPath(ZkClient client, string path, string data)
- {
- try
- {
- client.CreatePersistent(path, data);
- }
- catch (ZkNoNodeException e)
- {
- CreateParentPath(client, path);
- client.CreatePersistent(path, data);
- }
- }
-
- ///
- /// Update the value of a persistent node with the given path and data.
- /// create parrent directory if necessary. Never throw NodeExistException.
- /// Return the updated path zkVersion
- ///
- ///
- ///
- ///
- public static void UpdatePersistentPath(ZkClient client, string path, string data)
- {
- try
- {
- client.WriteData(path, data);
- }
- catch (ZkNoNodeException)
- {
- CreateParentPath(client, path);
- try
- {
- client.CreatePersistent(path, data);
- }
- catch (ZkNodeExistsException)
- {
- client.WriteData(path, data);
- }
- }
- }
-
- public static bool DeletePath(ZkClient client, string path)
- {
- try
- {
- return client.Delete(path);
- }
- catch (ZkNoNodeException)
- {
- // this can happen during a connection loss event, return normally
- Logger.InfoFormat("{0} deleted during connection loss; This is ok. ", path);
- return false;
- }
- }
-
- public static void MaybeDeletePath(string zkUrl, string dir)
- {
- try
- {
- var zk = new ZkClient(zkUrl, 30 * 1000, 30 * 1000, new ZkStringSerializer());
- zk.DeleteRecursive(dir);
- zk.Dispose();
- }
- catch
- {
- // swallow
- }
- }
-
-
- public static Tuple ReadData(ZkClient client, string path)
- {
- var stat = new Stat();
- var dataString = client.ReadData(path, stat);
- return Tuple.Create(dataString, stat);
- }
-
- public static Tuple ReadDataMaybeNull(ZkClient client, string path)
- {
- var stat = new Stat();
- try
- {
- var obj = client.ReadData(path, stat);
- return Tuple.Create(obj, stat);
- }
- catch (ZkNoNodeException)
- {
- return Tuple.Create((string)null, stat);
- }
- }
-
- public static List GetChildren(ZkClient zkClient, string path)
- {
- return zkClient.GetChildren(path);
- }
-
- public static IList GetChildrenParentMayNotExist(ZkClient client, string path)
- {
- try
- {
- return client.GetChildren(path);
- }
- catch (ZkNoNodeException)
- {
- return null;
- }
- }
-
- public static bool PathExists(ZkClient client, string path)
- {
- return client.Exists(path);
- }
-
- public static Cluster GetCluster(ZkClient zkClient)
- {
- var cluster = new Cluster();
- var nodes = GetChildrenParentMayNotExist(zkClient, BrokerIdsPath);
- foreach (var node in nodes)
- {
- var brokerZkString = ReadData(zkClient, BrokerIdsPath + "/" + node).Item1;
- cluster.Add(Broker.CreateBroker(int.Parse(node), brokerZkString));
- }
-
- return cluster;
- }
-
- public static IDictionary>> GetPartitionAssignmentForTopics(
- ZkClient zkClient, IList topics)
- {
- IDictionary>> ret = new Dictionary>>();
- foreach (var topic in topics)
- {
- var jsonPartitionMap = ReadDataMaybeNull(zkClient, GetTopicPath(topic)).Item1;
- IDictionary> partitionMap = new Dictionary>();
- if (jsonPartitionMap != null)
- {
- var m = JObject.Parse(jsonPartitionMap);
- var replicaMap = (IDictionary)m.Get("partitions");
- if (replicaMap != null)
- {
- partitionMap = replicaMap.ToDictionary(
- kvp => int.Parse(kvp.Key), kvp => kvp.Value.Values().ToList());
- }
- }
-
- Logger.DebugFormat("Partition map for /brokers/topics/{0} is {1}", topic, JObject.FromObject(partitionMap).ToString(Formatting.None));
- ret[topic] = partitionMap;
- }
-
- return ret;
- }
-
- public static IDictionary> GetConsumersPerTopic(ZkClient zkClient, string group)
- {
- var dirs = new ZKGroupDirs(group);
- var consumers = GetChildrenParentMayNotExist(zkClient, dirs.ConsumerRegistryDir);
- var consumerPerTopicMap = new Dictionary>();
- foreach (var consumer in consumers)
- {
- var topicCount = TopicCount.ConstructTopicCount(group, consumer, zkClient);
- foreach (var topicAndConsumer in topicCount.GetConsumerThreadIdsPerTopic())
- {
- var topic = topicAndConsumer.Key;
- var consumerThreadIdSet = topicAndConsumer.Value;
- foreach (var consumerThreadId in consumerThreadIdSet)
- {
- var curConsumers = consumerPerTopicMap.Get(topic);
- if (curConsumers != null)
- {
- curConsumers.Add(consumerThreadId);
- }
- else
- {
- consumerPerTopicMap[topic] = new List { consumerThreadId };
- }
- }
- }
- }
-
- consumerPerTopicMap = consumerPerTopicMap.ToDictionary(x => x.Key, x => x.Value.OrderBy(y => y).ToList());
-
- return consumerPerTopicMap;
- }
-
- ///
- /// This API takes in a broker id, queries zookeeper for the broker metadata and returns the metadata for that broker
- /// or throws an exception if the broker dies before the query to zookeeper finishes
- ///
- /// The zookeeper client connection
- /// The broker id
- /// An optional Broker object encapsulating the broker metadata
- public static Broker GetBrokerInfo(ZkClient zkClient, int brokerId)
- {
- var brokerInfo = ReadDataMaybeNull(zkClient, BrokerIdsPath + "/" + brokerId);
- if (brokerInfo != null)
- {
- return Broker.CreateBroker(brokerId, brokerInfo.Item1);
- }
- else
- {
- return null;
- }
- }
- }
-
- internal class LeaderExistsOrChangedListener : IZkDataListener
- {
- private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
-
- private string topic;
-
- private int partition;
-
- private ReentrantLock leaderLock;
-
- private ICondition leaderExistsOrChanged;
-
- private int? oldLeaderOpt;
-
- private ZkClient zkClient;
-
- public LeaderExistsOrChangedListener(string topic, int partition, ReentrantLock leaderLock, ICondition leaderExistsOrChanged, int? oldLeaderOpt, ZkClient zkClient)
- {
- this.topic = topic;
- this.partition = partition;
- this.leaderLock = leaderLock;
- this.leaderExistsOrChanged = leaderExistsOrChanged;
- this.oldLeaderOpt = oldLeaderOpt;
- this.zkClient = zkClient;
- }
-
- public void HandleDataChange(string dataPath, object data)
- {
- var dataPathSplited = dataPath.Split('/');
- var t = dataPathSplited[dataPathSplited.Length - 4];
- var p = int.Parse(dataPathSplited[dataPathSplited.Length - 2]);
- this.leaderLock.Lock();
-
- try
- {
- if (t == this.topic && p == this.partition)
- {
- if (this.oldLeaderOpt.HasValue == false)
- {
- Logger.DebugFormat(
- "In leader existence listener on partition [{0}, {1}], leader has been created",
- topic,
- partition);
- this.leaderExistsOrChanged.Signal();
- }
- else
- {
- var newLeaderOpt = ZkUtils.GetLeaderForPartition(this.zkClient, t, p);
- if (newLeaderOpt.HasValue && newLeaderOpt.Value != this.oldLeaderOpt.Value)
- {
- Logger.DebugFormat("In leader change listener on partition [{0}, {1}], leader has been moved from {2} to {3}", topic, partition, oldLeaderOpt.Value, newLeaderOpt.Value);
- this.leaderExistsOrChanged.Signal();
- }
- }
- }
- }
- finally
- {
- this.leaderLock.Unlock();
- }
-
- }
-
- public void HandleDataDeleted(string dataPath)
- {
- leaderLock.Lock();
- try
- {
- leaderExistsOrChanged.Signal();
- }
- finally
- {
- leaderLock.Unlock();
- }
- }
- }
-
- public class ZkStringSerializer : IZkSerializer
- {
- public byte[] Serialize(object data)
- {
- return Encoding.UTF8.GetBytes(data.ToString());
- }
-
- public object Deserialize(byte[] bytes)
- {
- if (bytes == null)
- {
- return null;
- }
-
- return Encoding.UTF8.GetString(bytes);
- }
- }
-
- public class ZKGroupDirs
- {
- public string Group { get; set; }
-
- public ZKGroupDirs(string @group)
- {
- this.Group = @group;
- }
-
- public string ConsumerDir
- {
- get
- {
- return ZkUtils.ConsumersPath;
- }
- }
-
- public string ConsumerGroupDir
- {
- get
- {
- return this.ConsumerDir + "/" + this.Group;
- }
- }
-
- public string ConsumerRegistryDir
- {
- get
- {
- return this.ConsumerDir + "/ids";
- }
- }
- }
-
- public class ZKGroupTopicDirs : ZKGroupDirs
- {
- public string Topic { get; private set; }
-
- public ZKGroupTopicDirs(string @group, string topic)
- : base(@group)
- {
- this.Topic = topic;
- }
-
- public string ConsumerOffsetDir
- {
- get
- {
- return this.ConsumerGroupDir + "/offsets/" + this.Topic;
- }
- }
-
- public string ConsumerOwnerDir
- {
- get
- {
- return this.ConsumerGroupDir + "/owners/" + this.Topic;
- }
- }
- }
-
- public class ZkConfig
- {
- public const int DefaultSessionTimeout = 6000;
-
- public const int DefaultConnectionTimeout = 6000;
-
- public const int DefaultSyncTime = 2000;
-
- public ZkConfig()
- : this(null, DefaultSessionTimeout, DefaultConnectionTimeout, DefaultSyncTime)
- {
- }
-
- public ZkConfig(string zkconnect, int zksessionTimeoutMs, int zkconnectionTimeoutMs, int zksyncTimeMs)
- {
- this.ZkConnect = zkconnect;
- this.ZkConnectionTimeoutMs = zkconnectionTimeoutMs;
- this.ZkSessionTimeoutMs = zksessionTimeoutMs;
- this.ZkSyncTimeMs = zksyncTimeMs;
- }
-
- public string ZkConnect { get; set; }
-
- public int ZkSessionTimeoutMs { get; set; }
-
- public int ZkConnectionTimeoutMs { get; set; }
-
- public int ZkSyncTimeMs { get; set; }
- }
+namespace Kafka.Client.Utils
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Linq;
+ using System.Reflection;
+ using System.Text;
+ using System.Threading;
+
+ using Kafka.Client.Clusters;
+ using Kafka.Client.Consumers;
+ using Kafka.Client.Extensions;
+ using Kafka.Client.ZKClient;
+ using Kafka.Client.ZKClient.Exceptions;
+ using Kafka.Client.ZKClient.Serialize;
+
+ using Spring.Threading.Locks;
+
+ using log4net;
+
+ using Newtonsoft.Json;
+ using Newtonsoft.Json.Linq;
+
+ using Org.Apache.Zookeeper.Data;
+
+ public static class ZkUtils
+ {
+ private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+
+ public const string ConsumersPath = "/consumers";
+
+ public const string BrokerIdsPath = "/brokers/ids";
+
+ public const string BrokerTopicsPath = "/brokers/topics";
+
+ public const string TopicConfigPath = "/config/topics";
+
+ public const string TopicConfigChangesPath = "/config/changes";
+
+ public const string ControllerPath = "/controller";
+
+ public const string ControllerEpochPath = "/controller_epoch";
+
+ public const string ReassignPartitionsPath = "/admin/reassign_partitions";
+
+ public const string DeleteTopicsPath = "/admin/delete_topics";
+
+ public const string PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election";
+
+ public static string GetTopicPath(string topic)
+ {
+ return BrokerTopicsPath + "/" + topic;
+ }
+
+ public static string GetTopicPartitionsPath(string topic)
+ {
+ return GetTopicPath(topic) + "/partitions";
+ }
+
+ public static string GetTopicConfigPath(string topic)
+ {
+ return TopicConfigPath + "/" + topic;
+ }
+
+ public static string GetDeleteTopicPath(string topic)
+ {
+ return DeleteTopicsPath + "/" + topic;
+ }
+
+ public static string GetTopicPartitionPath(string topic, int partitionId)
+ {
+ return GetTopicPartitionsPath(topic) + "/" + partitionId;
+ }
+
+ public static string GetTopicPartitionLeaderAndIsrPath(string topic, int partitionId)
+ {
+ return GetTopicPartitionPath(topic, partitionId) + "/" + "state";
+ }
+
+ public static List GetSortedBrokerList(ZkClient zkClient)
+ {
+ return GetChildren(zkClient, BrokerIdsPath).Select(int.Parse).OrderBy(x => x).ToList();
+ }
+
+ public static List GetAllBrokersInCluster(ZkClient zkClient)
+ {
+ var brokerIds = GetChildrenParentMayNotExist(zkClient, BrokerIdsPath).OrderBy(x => x).ToList();
+ return
+ brokerIds.Select(int.Parse)
+ .Select(id => GetBrokerInfo(zkClient, id))
+ .Where(x => x != null)
+ .ToList();
+ }
+
+ public static int? GetLeaderForPartition(ZkClient zkClient, string topic, int partition)
+ {
+ var leaderAndIsrOpt = ReadDataMaybeNull(zkClient, GetTopicPartitionLeaderAndIsrPath(topic, partition)).Item1;
+ if (leaderAndIsrOpt != null)
+ {
+ return JObject.Parse(leaderAndIsrOpt).SelectToken("leader").Value();
+ }
+
+ return null;
+ }
+
+ public static string GetConsumerPartitionOwnerPath(string group, string topic, int partition)
+ {
+ var topicDirs = new ZKGroupTopicDirs(group, topic);
+ return topicDirs.ConsumerOwnerDir + "/" + partition;
+ }
+
+ ///
+ /// Get JSON partition to replica map from zookeeper.
+ ///
+ ///
+ ///
+ public static string ReplicaAssignmentZkData(Dictionary> map)
+ {
+ return JObject.FromObject(new { version = 1, partitions = map }).ToString();
+ }
+
+
+ ///
+ /// make sure a persistent path exists in ZK. Create the path if not exist.
+ ///
+ ///
+ ///
+ public static void MakeSurePersistentPathExists(ZkClient client, string path)
+ {
+ if (!client.Exists(path))
+ {
+ client.CreatePersistent(path, true); // won't throw NoNodeException or NodeExistsException
+ }
+ }
+
+ ///
+ /// create the parent path
+ ///
+ ///
+ ///
+ private static void CreateParentPath(ZkClient client, string path)
+ {
+ var parentDir = path.Substring(0, path.LastIndexOf('/'));
+ if (parentDir.Length != 0)
+ {
+ client.CreatePersistent(parentDir, true);
+ }
+ }
+
+ ///
+ /// Create an ephemeral node with the given path and data. Create parents if necessary.
+ ///
+ ///
+ ///
+ ///
+ private static void CreateEphemeralPath(ZkClient client, string path, string data)
+ {
+ try
+ {
+ client.CreateEphemeral(path, data);
+ }
+ catch (ZkNoNodeException)
+ {
+ CreateParentPath(client, path);
+ client.CreateEphemeral(path, data);
+ }
+ }
+
+ ///
+ /// Create an ephemeral node with the given path and data.
+ /// Throw NodeExistException if node already exists.
+ ///
+ ///
+ ///
+ ///
+ public static void CreateEphemeralPathExpectConflict(ZkClient client, string path, string data)
+ {
+ try
+ {
+ CreateEphemeralPath(client, path, data);
+ }
+ catch (ZkNodeExistsException)
+ {
+ // this can happen when there is connection loss; make sure the Data is what we intend to write
+ string storedData = null;
+ try
+ {
+ storedData = ReadData(client, path).Item1;
+ }
+ catch (ZkNoNodeException)
+ {
+ // the node disappeared; treat as if node existed and let caller handles this
+ }
+
+ if (storedData == null || storedData != data)
+ {
+ Logger.InfoFormat("Conflict in {0} Data: {1}, stored Data: {2}", path, data, storedData);
+ throw;
+ }
+ else
+ {
+ // otherwise, the creation succeeded, return normally
+ Logger.InfoFormat("{0} exists with value {1} during connection loss", path, data);
+ }
+ }
+ }
+
+ ///
+ /// Create an ephemeral node with the given path and data.
+ /// Throw NodeExistsException if node already exists.
+ /// Handles the following ZK session timeout b_u_g
+ ///
+ /// https://issues.apache.org/jira/browse/ZOOKEEPER-1740
+ ///
+ /// Upon receiving a NodeExistsException, read the data from the conflicted path and
+ /// trigger the checker function comparing the read data and the expected data,
+ /// If the checker function returns true then the above b_u_g might be encountered, back off and retry;
+ /// otherwise re-throw the exception
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public static void CreateEphemeralPathExpectConflictHandleZKBug(
+ ZkClient zkClient,
+ string path,
+ string data,
+ object expectedCallerData,
+ Func checker,
+ int backoffTime)
+ {
+ while (true)
+ {
+ try
+ {
+ CreateEphemeralPathExpectConflict(zkClient, path, data);
+ return;
+ }
+ catch (ZkNodeExistsException)
+ {
+ // An ephemeral node may still exist even after its corresponding session has expired
+ // due to a Zookeeper ug, in this case we need to retry writing until the previous node is deleted
+ // and hence the write succeeds without ZkNodeExistsException
+ var writtenData = ReadDataMaybeNull(zkClient, path).Item1;
+ if (writtenData != null)
+ {
+ if (checker(writtenData, expectedCallerData))
+ {
+ Logger.InfoFormat(
+ "I wrote this conflicted ephemeral node [{0}] at {1} a while back in a different session, "
+ + "hence I will backoff for this node to be deleted by Zookeeper and retry",
+ data,
+ path);
+
+ Thread.Sleep(backoffTime);
+ }
+ else
+ {
+ throw;
+ }
+ }
+ else
+ {
+ // the node disappeared; retry creating the ephemeral node immediately
+ }
+ }
+ }
+ }
+
+ ///
+ /// Create an persistent node with the given path and data. Create parents if necessary.
+ ///
+ public static void CreatePersistentPath(ZkClient client, string path, string data)
+ {
+ try
+ {
+ client.CreatePersistent(path, data);
+ }
+ catch (ZkNoNodeException e)
+ {
+ CreateParentPath(client, path);
+ client.CreatePersistent(path, data);
+ }
+ }
+
+ ///
+ /// Update the value of a persistent node with the given path and data.
+ /// create parrent directory if necessary. Never throw NodeExistException.
+ /// Return the updated path zkVersion
+ ///
+ ///
+ ///
+ ///
+ public static void UpdatePersistentPath(ZkClient client, string path, string data)
+ {
+ try
+ {
+ client.WriteData(path, data);
+ }
+ catch (ZkNoNodeException)
+ {
+ CreateParentPath(client, path);
+ try
+ {
+ client.CreatePersistent(path, data);
+ }
+ catch (ZkNodeExistsException)
+ {
+ client.WriteData(path, data);
+ }
+ }
+ }
+
+ public static bool DeletePath(ZkClient client, string path)
+ {
+ try
+ {
+ return client.Delete(path);
+ }
+ catch (ZkNoNodeException)
+ {
+ // this can happen during a connection loss event, return normally
+ Logger.InfoFormat("{0} deleted during connection loss; This is ok. ", path);
+ return false;
+ }
+ }
+
+ public static void MaybeDeletePath(string zkUrl, string dir)
+ {
+ try
+ {
+ var zk = new ZkClient(zkUrl, 30 * 1000, 30 * 1000, new ZkStringSerializer());
+ zk.DeleteRecursive(dir);
+ zk.Dispose();
+ }
+ catch
+ {
+ // swallow
+ }
+ }
+
+
+ public static Tuple ReadData(ZkClient client, string path)
+ {
+ var stat = new Stat();
+ var dataString = client.ReadData(path, stat);
+ return Tuple.Create(dataString, stat);
+ }
+
+ public static Tuple ReadDataMaybeNull(ZkClient client, string path)
+ {
+ var stat = new Stat();
+ try
+ {
+ var obj = client.ReadData(path, stat);
+ return Tuple.Create(obj, stat);
+ }
+ catch (ZkNoNodeException)
+ {
+ return Tuple.Create((string)null, stat);
+ }
+ }
+
+ public static List GetChildren(ZkClient zkClient, string path)
+ {
+ return zkClient.GetChildren(path);
+ }
+
+ public static IList GetChildrenParentMayNotExist(ZkClient client, string path)
+ {
+ try
+ {
+ return client.GetChildren(path);
+ }
+ catch (ZkNoNodeException)
+ {
+ return null;
+ }
+ }
+
+ public static bool PathExists(ZkClient client, string path)
+ {
+ return client.Exists(path);
+ }
+
+ public static Cluster GetCluster(ZkClient zkClient)
+ {
+ var cluster = new Cluster();
+ var nodes = GetChildrenParentMayNotExist(zkClient, BrokerIdsPath);
+ foreach (var node in nodes)
+ {
+ var brokerZkString = ReadData(zkClient, BrokerIdsPath + "/" + node).Item1;
+ cluster.Add(Broker.CreateBroker(int.Parse(node), brokerZkString));
+ }
+
+ return cluster;
+ }
+
+ public static IDictionary>> GetPartitionAssignmentForTopics(
+ ZkClient zkClient, IList topics)
+ {
+ IDictionary>> ret = new Dictionary>>();
+ foreach (var topic in topics)
+ {
+ var jsonPartitionMap = ReadDataMaybeNull(zkClient, GetTopicPath(topic)).Item1;
+ IDictionary> partitionMap = new Dictionary>();
+ if (jsonPartitionMap != null)
+ {
+ var m = JObject.Parse(jsonPartitionMap);
+ var replicaMap = (IDictionary)m.Get("partitions");
+ if (replicaMap != null)
+ {
+ partitionMap = replicaMap.ToDictionary(
+ kvp => int.Parse(kvp.Key), kvp => kvp.Value.Values().ToList());
+ }
+ }
+
+ Logger.DebugFormat("Partition map for /brokers/topics/{0} is {1}", topic, JObject.FromObject(partitionMap).ToString(Formatting.None));
+ ret[topic] = partitionMap;
+ }
+
+ return ret;
+ }
+
+ public static IDictionary> GetConsumersPerTopic(ZkClient zkClient, string group)
+ {
+ var dirs = new ZKGroupDirs(group);
+ var consumers = GetChildrenParentMayNotExist(zkClient, dirs.ConsumerRegistryDir);
+ var consumerPerTopicMap = new Dictionary>();
+ foreach (var consumer in consumers)
+ {
+ var topicCount = TopicCount.ConstructTopicCount(group, consumer, zkClient);
+ foreach (var topicAndConsumer in topicCount.GetConsumerThreadIdsPerTopic())
+ {
+ var topic = topicAndConsumer.Key;
+ var consumerThreadIdSet = topicAndConsumer.Value;
+ foreach (var consumerThreadId in consumerThreadIdSet)
+ {
+ var curConsumers = consumerPerTopicMap.Get(topic);
+ if (curConsumers != null)
+ {
+ curConsumers.Add(consumerThreadId);
+ }
+ else
+ {
+ consumerPerTopicMap[topic] = new List { consumerThreadId };
+ }
+ }
+ }
+ }
+
+ consumerPerTopicMap = consumerPerTopicMap.ToDictionary(x => x.Key, x => x.Value.OrderBy(y => y).ToList());
+
+ return consumerPerTopicMap;
+ }
+
+ ///
+ /// This API takes in a broker id, queries zookeeper for the broker metadata and returns the metadata for that broker
+ /// or throws an exception if the broker dies before the query to zookeeper finishes
+ ///
+ /// The zookeeper client connection
+ /// The broker id
+ /// An optional Broker object encapsulating the broker metadata
+ public static Broker GetBrokerInfo(ZkClient zkClient, int brokerId)
+ {
+ var brokerInfo = ReadDataMaybeNull(zkClient, BrokerIdsPath + "/" + brokerId);
+ if (brokerInfo != null)
+ {
+ return Broker.CreateBroker(brokerId, brokerInfo.Item1);
+ }
+ else
+ {
+ return null;
+ }
+ }
+ }
+
+ internal class LeaderExistsOrChangedListener : IZkDataListener
+ {
+ private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+
+ private string topic;
+
+ private int partition;
+
+ private ReentrantLock leaderLock;
+
+ private ICondition leaderExistsOrChanged;
+
+ private int? oldLeaderOpt;
+
+ private ZkClient zkClient;
+
+ public LeaderExistsOrChangedListener(string topic, int partition, ReentrantLock leaderLock, ICondition leaderExistsOrChanged, int? oldLeaderOpt, ZkClient zkClient)
+ {
+ this.topic = topic;
+ this.partition = partition;
+ this.leaderLock = leaderLock;
+ this.leaderExistsOrChanged = leaderExistsOrChanged;
+ this.oldLeaderOpt = oldLeaderOpt;
+ this.zkClient = zkClient;
+ }
+
+ public void HandleDataChange(string dataPath, object data)
+ {
+ var dataPathSplited = dataPath.Split('/');
+ var t = dataPathSplited[dataPathSplited.Length - 4];
+ var p = int.Parse(dataPathSplited[dataPathSplited.Length - 2]);
+ this.leaderLock.Lock();
+
+ try
+ {
+ if (t == this.topic && p == this.partition)
+ {
+ if (this.oldLeaderOpt.HasValue == false)
+ {
+ Logger.DebugFormat(
+ "In leader existence listener on partition [{0}, {1}], leader has been created",
+ topic,
+ partition);
+ this.leaderExistsOrChanged.Signal();
+ }
+ else
+ {
+ var newLeaderOpt = ZkUtils.GetLeaderForPartition(this.zkClient, t, p);
+ if (newLeaderOpt.HasValue && newLeaderOpt.Value != this.oldLeaderOpt.Value)
+ {
+ Logger.DebugFormat("In leader change listener on partition [{0}, {1}], leader has been moved from {2} to {3}", topic, partition, oldLeaderOpt.Value, newLeaderOpt.Value);
+ this.leaderExistsOrChanged.Signal();
+ }
+ }
+ }
+ }
+ finally
+ {
+ this.leaderLock.Unlock();
+ }
+
+ }
+
+ public void HandleDataDeleted(string dataPath)
+ {
+ leaderLock.Lock();
+ try
+ {
+ leaderExistsOrChanged.Signal();
+ }
+ finally
+ {
+ leaderLock.Unlock();
+ }
+ }
+ }
+
+ public class ZkStringSerializer : IZkSerializer
+ {
+ public byte[] Serialize(object data)
+ {
+ return Encoding.UTF8.GetBytes(data.ToString());
+ }
+
+ public object Deserialize(byte[] bytes)
+ {
+ if (bytes == null)
+ {
+ return null;
+ }
+
+ return Encoding.UTF8.GetString(bytes);
+ }
+ }
+
+ public class ZKGroupDirs
+ {
+ public string Group { get; set; }
+
+ public ZKGroupDirs(string @group)
+ {
+ this.Group = @group;
+ }
+
+ public string ConsumerDir
+ {
+ get
+ {
+ return ZkUtils.ConsumersPath;
+ }
+ }
+
+ public string ConsumerGroupDir
+ {
+ get
+ {
+ return this.ConsumerDir + "/" + this.Group;
+ }
+ }
+
+ public string ConsumerRegistryDir
+ {
+ get
+ {
+ return this.ConsumerGroupDir + "/ids";
+ }
+ }
+ }
+
+ public class ZKGroupTopicDirs : ZKGroupDirs
+ {
+ public string Topic { get; private set; }
+
+ public ZKGroupTopicDirs(string @group, string topic)
+ : base(@group)
+ {
+ this.Topic = topic;
+ }
+
+ public string ConsumerOffsetDir
+ {
+ get
+ {
+ return this.ConsumerGroupDir + "/offsets/" + this.Topic;
+ }
+ }
+
+ public string ConsumerOwnerDir
+ {
+ get
+ {
+ return this.ConsumerGroupDir + "/owners/" + this.Topic;
+ }
+ }
+ }
+
+ public class ZkConfig
+ {
+ public const int DefaultSessionTimeout = 6000;
+
+ public const int DefaultConnectionTimeout = 6000;
+
+ public const int DefaultSyncTime = 2000;
+
+ public ZkConfig()
+ : this(null, DefaultSessionTimeout, DefaultConnectionTimeout, DefaultSyncTime)
+ {
+ }
+
+ public ZkConfig(string zkconnect, int zksessionTimeoutMs, int zkconnectionTimeoutMs, int zksyncTimeMs)
+ {
+ this.ZkConnect = zkconnect;
+ this.ZkConnectionTimeoutMs = zkconnectionTimeoutMs;
+ this.ZkSessionTimeoutMs = zksessionTimeoutMs;
+ this.ZkSyncTimeMs = zksyncTimeMs;
+ }
+
+ public string ZkConnect { get; set; }
+
+ public int ZkSessionTimeoutMs { get; set; }
+
+ public int ZkConnectionTimeoutMs { get; set; }
+
+ public int ZkSyncTimeMs { get; set; }
+ }
}
\ No newline at end of file