Skip to content

Commit

Permalink
Adds ability to produce to a specific partition (#40)
Browse files Browse the repository at this point in the history
* Add ability to specify partition for producers.

---------

Co-authored-by: WillF <[email protected]>
  • Loading branch information
w-as and WillF authored Mar 15, 2024
1 parent d3d97be commit a55a9ca
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 11 deletions.
32 changes: 30 additions & 2 deletions src/QuixStreams.Streaming.UnitTests/Helpers/TestStreamingClient.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Confluent.Kafka;
using QuixStreams.Kafka;
using QuixStreams.Kafka.Transport;
using QuixStreams.Kafka.Transport.Tests.Helpers;
Expand All @@ -20,7 +19,6 @@ public class TestStreamingClient : IQuixStreamingClient, IKafkaStreamingClient
private Func<string, TelemetryKafkaProducer> createKafkaProducer;
private Dictionary<string, TestBroker> brokers = new Dictionary<string, TestBroker>();


public TestStreamingClient(CodecType codec = CodecType.Protobuf, TimeSpan publishDelay = default)
{
this.publishDelay = publishDelay;
Expand Down Expand Up @@ -57,6 +55,11 @@ public ITopicProducer GetTopicProducer(string topic)
return topicProducer;
}

public ITopicProducer GetTopicProducer(string topic, int partitionId)
{
return GetTopicProducer(topic);
}

private TestBroker GetBroker(string topic)
{
if (this.brokers.TryGetValue(topic, out var broker)) return broker;
Expand Down Expand Up @@ -85,11 +88,21 @@ IRawTopicProducer IKafkaStreamingClient.GetRawTopicProducer(string topic)
throw new NotImplementedException();
}

IRawTopicProducer IKafkaStreamingClient.GetRawTopicProducer(string topic, int partitionId)
{
throw new NotImplementedException();
}

ITopicProducer IKafkaStreamingClient.GetTopicProducer(string topic)
{
return GetTopicProducer(topic);
}

ITopicProducer IKafkaStreamingClient.GetTopicProducer(string topic, int partitionId)
{
return GetTopicProducer(topic);
}

ITopicConsumer IKafkaStreamingClient.GetTopicConsumer(string topic, string consumerGroup, CommitOptions options, AutoOffsetReset autoOffset)
{
return GetTopicConsumer(topic);
Expand All @@ -111,6 +124,11 @@ IRawTopicProducer IQuixStreamingClient.GetRawTopicProducer(string topicIdOrName)
throw new NotImplementedException();
}

IRawTopicProducer IQuixStreamingClient.GetRawTopicProducer(string topicIdOrName, int partitionId)
{
throw new NotImplementedException();
}

ITopicProducer IQuixStreamingClient.GetTopicProducer(string topicIdOrName)
{
return GetTopicProducer(topicIdOrName);
Expand Down Expand Up @@ -145,9 +163,19 @@ Task<IRawTopicProducer> IQuixStreamingClientAsync.GetRawTopicProducerAsync(strin
return Task.FromResult(((IQuixStreamingClient)this).GetRawTopicProducer(topicIdOrName));
}

Task<IRawTopicProducer> IQuixStreamingClientAsync.GetRawTopicProducerAsync(string topicIdOrName, int partitionId)
{
return Task.FromResult(((IQuixStreamingClient)this).GetRawTopicProducer(topicIdOrName, partitionId));
}

Task<ITopicProducer> IQuixStreamingClientAsync.GetTopicProducerAsync(string topicIdOrName)
{
return Task.FromResult(((IQuixStreamingClient)this).GetTopicProducer(topicIdOrName));
}

Task<ITopicProducer> IQuixStreamingClientAsync.GetTopicProducerAsync(string topicIdOrName, int partitionId)
{
return Task.FromResult(((IQuixStreamingClient)this).GetTopicProducer(topicIdOrName, partitionId));
}
}
}
49 changes: 47 additions & 2 deletions src/QuixStreams.Streaming/KafkaStreamingClient.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
using System;
using System.Collections.Generic;
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using QuixStreams;
using QuixStreams.Kafka;
using QuixStreams.Kafka.Transport;
using QuixStreams.Streaming.Configuration;
Expand Down Expand Up @@ -53,12 +51,28 @@ public interface IKafkaStreamingClient
/// <returns>Instance of <see cref="IRawTopicProducer"/></returns>
IRawTopicProducer GetRawTopicProducer(string topic);

/// <summary>
/// Gets a topic producer capable of publishing non-quixstreams messages.
/// </summary>
/// <param name="topic">Name of the topic.</param>
/// <param name="partitionId">Id of the partition to produce to.</param>
/// <returns>Instance of <see cref="IRawTopicProducer"/></returns>
IRawTopicProducer GetRawTopicProducer(string topic, int partitionId);

/// <summary>
/// Gets a topic producer capable of publishing stream messages.
/// </summary>
/// <param name="topic">Name of the topic.</param>
/// <returns>Instance of <see cref="ITopicProducer"/></returns>
ITopicProducer GetTopicProducer(string topic);

/// <summary>
/// Gets a topic producer capable of publishing stream messages.
/// </summary>
/// <param name="topic">Name of the topic.</param>
/// <param name="partitionId">Id of the partition to produce to.</param>
/// <returns>Instance of <see cref="ITopicProducer"/></returns>
ITopicProducer GetTopicProducer(string topic, int partitionId);
}

/// <summary>
Expand Down Expand Up @@ -207,6 +221,22 @@ public IRawTopicProducer GetRawTopicProducer(string topic)
var rawTopicProducer = new RawTopicProducer(brokerAddress, topic, brokerProperties);

App.Register(rawTopicProducer);

return rawTopicProducer;
}

/// <summary>
/// Gets a topic producer capable of publishing non-quixstreams messages.
/// </summary>
/// <param name="topic">Name of the topic.</param>
/// <param name="partitionId">Id of the partition to produce to.</param>
/// <returns>Instance of <see cref="IRawTopicProducer"/></returns>
public IRawTopicProducer GetRawTopicProducer(string topic, int partitionId)
{
var rawTopicProducer = new RawTopicProducer(brokerAddress, topic, brokerProperties, partitionId);

App.Register(rawTopicProducer);

return rawTopicProducer;
}

Expand All @@ -223,6 +253,21 @@ public ITopicProducer GetTopicProducer(string topic)

return topicProducer;
}

/// <summary>
/// Gets a topic producer capable of publishing stream messages.
/// </summary>
/// <param name="topic">Name of the topic.</param>
/// <param name="partitionId">Id of the partition to produce to.</param>
/// <returns>Instance of <see cref="ITopicProducer"/></returns>
public ITopicProducer GetTopicProducer(string topic, int partitionId)
{
var topicProducer = new TopicProducer(new KafkaProducerConfiguration(brokerAddress, brokerProperties), topic, partitionId);

App.Register(topicProducer);

return topicProducer;
}
}

/// <summary>
Expand Down
84 changes: 83 additions & 1 deletion src/QuixStreams.Streaming/QuixStreamingClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
using System.Text;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using QuixStreams.Kafka;
Expand Down Expand Up @@ -73,12 +72,28 @@ public interface IQuixStreamingClient : IQuixStreamingClientAsync
/// <returns>Instance of <see cref="IRawTopicProducer"/></returns>
IRawTopicProducer GetRawTopicProducer(string topicIdOrName);

/// <summary>
/// Gets a topic producer capable of publishing non-quixstreams messages.
/// </summary>
/// <param name="topic">Name of the topic.</param>
/// <param name="partitionId">Id of the partition to produce to.</param>
/// <returns>Instance of <see cref="IRawTopicProducer"/></returns>
IRawTopicProducer GetRawTopicProducer(string topic, int partitionId);

/// <summary>
/// Gets a topic producer capable of publishing stream messages.
/// </summary>
/// <param name="topicIdOrName">Id or name of the topic. If name is provided, workspace will be derived from environment variable or token, in that order</param>
/// <returns>Instance of <see cref="ITopicProducer"/></returns>
ITopicProducer GetTopicProducer(string topicIdOrName);

/// <summary>
/// Gets a topic producer capable of publishing stream messages.
/// </summary>
/// <param name="topic">Name of the topic.</param>
/// <param name="partitionId">Id of the partition to produce to.</param>
/// <returns>Instance of <see cref="ITopicProducer"/></returns>
ITopicProducer GetTopicProducer(string topic, int partitionId);
}

/// <summary>
Expand Down Expand Up @@ -123,12 +138,28 @@ public interface IQuixStreamingClientAsync
/// <returns>A task returning an instance of <see cref="IRawTopicProducer"/></returns>
Task<IRawTopicProducer> GetRawTopicProducerAsync(string topicIdOrName);

/// <summary>
/// Asynchronously gets a topic producer capable of publishing non-quixstreams messages.
/// </summary>
/// <param name="topicIdOrName">Id or name of the topic. If name is provided, workspace will be derived from environment variable or token, in that order</param>
/// <param name="partitionId">Id of the partition to produce to.</param>
/// <returns>A task returning an instance of <see cref="IRawTopicProducer"/></returns>
Task<IRawTopicProducer> GetRawTopicProducerAsync(string topicIdOrName, int partitionId);

/// <summary>
/// Asynchronously gets a topic producer capable of publishing stream messages.
/// </summary>
/// <param name="topicIdOrName">Id or name of the topic. If name is provided, workspace will be derived from environment variable or token, in that order</param>
/// <returns>A task returning an instance of <see cref="ITopicProducer"/></returns>
Task<ITopicProducer> GetTopicProducerAsync(string topicIdOrName);

/// <summary>
/// Asynchronously gets a topic producer capable of publishing stream messages.
/// </summary>
/// <param name="topicIdOrName">Id or name of the topic. If name is provided, workspace will be derived from environment variable or token, in that order</param>
/// <param name="partitionId">Id of the partition to produce to.</param>
/// <returns>A task returning an instance of <see cref="ITopicProducer"/></returns>
Task<ITopicProducer> GetTopicProducerAsync(string topicIdOrName, int partitionId);
}

/// <summary>
Expand Down Expand Up @@ -337,6 +368,21 @@ public IRawTopicProducer GetRawTopicProducer(string topicIdOrName)
return client.GetRawTopicProducer(topicId);
}

/// <summary>
/// Gets a topic producer capable of publishing non-quixstreams messages.
/// </summary>
/// <param name="topicIdOrName">Id or name of the topic. If name is provided, workspace will be derived from environment variable or token, in that order</param>
/// <param name="partitionId">Id of the partition to produce to.</param>
/// <returns>Instance of <see cref="IRawTopicProducer"/></returns>
public IRawTopicProducer GetRawTopicProducer(string topicIdOrName, int partitionId)
{
if (string.IsNullOrWhiteSpace(topicIdOrName)) throw new ArgumentNullException(nameof(topicIdOrName));

var (client, topicId, _) = this.ValidateTopicAndCreateClient(topicIdOrName).ConfigureAwait(false).GetAwaiter().GetResult();

return client.GetRawTopicProducer(topicId, partitionId);
}

/// <inheritdoc/>
public async Task<IRawTopicProducer> GetRawTopicProducerAsync(string topicIdOrName)
{
Expand All @@ -347,6 +393,17 @@ public async Task<IRawTopicProducer> GetRawTopicProducerAsync(string topicIdOrNa
return client.GetRawTopicProducer(topicId);
}

/// <inheritdoc/>
public async Task<IRawTopicProducer> GetRawTopicProducerAsync(string topicIdOrName, int partitionId)
{
if (string.IsNullOrWhiteSpace(topicIdOrName)) throw new ArgumentNullException(nameof(topicIdOrName));

var (client, topicId, _) = await this.ValidateTopicAndCreateClient(topicIdOrName).ConfigureAwait(false);

return client.GetRawTopicProducer(topicId, partitionId);
}


/// <summary>
/// Gets a topic producer capable of publishing stream messages.
/// </summary>
Expand All @@ -361,6 +418,21 @@ public ITopicProducer GetTopicProducer(string topicIdOrName)
return client.GetTopicProducer(topicId);
}

/// <summary>
/// Gets a topic producer capable of publishing stream messages.
/// </summary>
/// <param name="topicIdOrName">Id or name of the topic. If name is provided, workspace will be derived from environment variable or token, in that order</param>
/// <param name="partitionId">Id of the partition to produce to.</param>
/// <returns>Instance of <see cref="ITopicProducer"/></returns>
public ITopicProducer GetTopicProducer(string topicIdOrName, int partitionId)
{
if (string.IsNullOrWhiteSpace(topicIdOrName)) throw new ArgumentNullException(nameof(topicIdOrName));

var (client, topicId, _) = this.ValidateTopicAndCreateClient(topicIdOrName).ConfigureAwait(false).GetAwaiter().GetResult();

return client.GetTopicProducer(topicId, partitionId);
}

/// <inheritdoc/>
public async Task<ITopicProducer> GetTopicProducerAsync(string topicIdOrName)
{
Expand All @@ -371,6 +443,16 @@ public async Task<ITopicProducer> GetTopicProducerAsync(string topicIdOrName)
return client.GetTopicProducer(topicId);
}

/// <inheritdoc/>
public async Task<ITopicProducer> GetTopicProducerAsync(string topicIdOrName, int partitionId)
{
if (string.IsNullOrWhiteSpace(topicIdOrName)) throw new ArgumentNullException(nameof(topicIdOrName));

var (client, topicId, _) = await this.ValidateTopicAndCreateClient(topicIdOrName).ConfigureAwait(false);

return client.GetTopicProducer(topicId, partitionId);
}

private async Task<(string, CommitOptions)> GetValidConsumerGroup(string topicIdOrName, string originalConsumerGroup, CommitOptions commitOptions)
{
topicIdOrName = topicIdOrName.Trim();
Expand Down
17 changes: 15 additions & 2 deletions src/QuixStreams.Streaming/Raw/RawTopicProducer.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using QuixStreams.Kafka;

Expand All @@ -25,18 +26,30 @@ public class RawTopicProducer: IRawTopicProducer
/// <param name="topicName">Name of the topic.</param>
/// <param name="brokerProperties">Additional broker properties</param>
public RawTopicProducer(string brokerAddress, string topicName, Dictionary<string, string> brokerProperties = null)
: this(brokerAddress, topicName, brokerProperties, Partition.Any)
{
}

/// <summary>
/// Initializes a new instance of <see cref="RawTopicProducer"/>
/// </summary>
/// <param name="brokerAddress">Address of Kafka cluster.</param>
/// <param name="topicName">Name of the topic.</param>
/// <param name="partition">Partition to produce to.</param>
/// <param name="brokerProperties">Additional broker properties</param>
public RawTopicProducer(string brokerAddress, string topicName, Dictionary<string, string> brokerProperties, Partition partition)
{
brokerProperties ??= new Dictionary<string, string>();
if (!brokerProperties.ContainsKey("queued.max.messages.kbytes")) brokerProperties["queued.max.messages.kbytes"] = "20480";

this.topicName = topicName;

var publisherConfiguration = new QuixStreams.Kafka.ProducerConfiguration(brokerAddress, brokerProperties);
var topicConfiguration = new QuixStreams.Kafka.ProducerTopicConfiguration(this.topicName);
var topicConfiguration = new QuixStreams.Kafka.ProducerTopicConfiguration(this.topicName, partition);

this.kafkaProducer = new KafkaProducer(publisherConfiguration, topicConfiguration);
}

/// <summary>
/// Initializes a new instance of <see cref="RawTopicProducer"/>
/// </summary>
Expand Down
22 changes: 18 additions & 4 deletions src/QuixStreams.Streaming/TopicProducer.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Linq;
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using QuixStreams.Kafka;
using QuixStreams.Telemetry.Kafka;
Expand Down Expand Up @@ -32,15 +33,28 @@ public TopicProducer(Func<string, TelemetryKafkaProducer> createKafkaProducer)
}

/// <summary>
/// Initializes a new instance of <see cref="TopicProducer"/>
/// Initializes a new instance of the <see cref="TopicProducer"/> class.
/// </summary>
/// <param name="config">Kafka producer configuration.</param>
/// <param name="topic">Name of the topic.</param>
public TopicProducer(KafkaProducerConfiguration config, string topic)
: this(config, topic, Partition.Any)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="TopicProducer"/> class.
/// </summary>
/// <param name="config">Kafka producer configuration.</param>
/// <param name="topic">Name of the topic.</param>
/// <param name="partition">Partition to produce to.</param>
public TopicProducer(KafkaProducerConfiguration config, string topic, Partition partition)
{
this.topic = topic;

var prodConfig = new ProducerConfiguration(config.BrokerList, config.Properties);
var topicConfig = new ProducerTopicConfiguration(topic);
var topicConfig = new ProducerTopicConfiguration(topic, partition);

this.kafkaProducer = new KafkaProducer(prodConfig, topicConfig);

createKafkaProducer = (string streamId) => new TelemetryKafkaProducer(this.kafkaProducer, streamId);
Expand Down

0 comments on commit a55a9ca

Please sign in to comment.