diff --git a/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs b/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs index 2c6968baa..858fd058b 100644 --- a/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs +++ b/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs @@ -77,9 +77,13 @@ public interface IClusterConfigurationBuilder /// The topic name /// The number of Topic partitions. Default is to use the cluster-defined partitions. /// The Topic replication factor. Default is to use the cluster-defined replication factor. + /// The configuration to use to create the new topic + /// A map from partition id to replica ids /// IClusterConfigurationBuilder CreateTopicIfNotExists( string topicName, int numberOfPartitions = -1, - short replicationFactor = -1); + short replicationFactor = -1, + IDictionary configs = null, + IDictionary> replicasAssignments = null); } diff --git a/src/KafkaFlow/Clusters/ClusterManager.cs b/src/KafkaFlow/Clusters/ClusterManager.cs index 814088c50..dd37288cf 100644 --- a/src/KafkaFlow/Clusters/ClusterManager.cs +++ b/src/KafkaFlow/Clusters/ClusterManager.cs @@ -122,6 +122,8 @@ public async Task CreateIfNotExistsAsync(IEnumerable configu Name = topicConfiguration.Name, ReplicationFactor = topicConfiguration.Replicas, NumPartitions = topicConfiguration.Partitions, + Configs = topicConfiguration.Configs.ToDictionary(x => x.Key, x => x.Value), + ReplicasAssignments = topicConfiguration.ReplicasAssignments.ToDictionary(x => x.Key, x => x.Value.ToList()), }) .ToArray(); diff --git a/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs index cd85129ef..e594c0520 100644 --- a/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs @@ -111,9 +111,11 @@ public IClusterConfigurationBuilder OnStarted(Action handle public IClusterConfigurationBuilder CreateTopicIfNotExists( string topicName, int numberOfPartitions = -1, - short replicationFactor = -1) + short replicationFactor = -1, + IDictionary configs = null, + IDictionary> replicasAssignments = null) { - _topicsToCreateIfNotExist.Add(new TopicConfiguration(topicName, numberOfPartitions, replicationFactor)); + _topicsToCreateIfNotExist.Add(new TopicConfiguration(topicName, numberOfPartitions, replicationFactor, configs, replicasAssignments)); return this; } } diff --git a/src/KafkaFlow/Configuration/TopicConfiguration.cs b/src/KafkaFlow/Configuration/TopicConfiguration.cs index f2eac31bb..672a559fe 100644 --- a/src/KafkaFlow/Configuration/TopicConfiguration.cs +++ b/src/KafkaFlow/Configuration/TopicConfiguration.cs @@ -1,3 +1,7 @@ +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Linq; + namespace KafkaFlow.Configuration; /// @@ -5,17 +9,26 @@ namespace KafkaFlow.Configuration; /// public class TopicConfiguration { + private static readonly IReadOnlyDictionary s_emptyConfigs = new ReadOnlyDictionary(new Dictionary()); + private static readonly IReadOnlyDictionary> s_emptyReplicasAssignments = new ReadOnlyDictionary>(new Dictionary>()); + /// /// Initializes a new instance of the class. /// /// The topic name /// The number of partitions for the topic /// Replication factor for the topic - public TopicConfiguration(string name, int partitions, short replicas) + /// The configuration to use to create the new topic + /// A map from partition id to replica ids + public TopicConfiguration(string name, int partitions, short replicas, IDictionary configs = null, IDictionary> replicasAssignments = null) { this.Name = name; this.Partitions = partitions; this.Replicas = replicas; + this.Configs = configs is null ? s_emptyConfigs : new ReadOnlyDictionary(configs); + this.ReplicasAssignments = replicasAssignments is null + ? s_emptyReplicasAssignments + : new ReadOnlyDictionary>(replicasAssignments.ToDictionary(x => x.Key, x => (IReadOnlyList)x.Value.ToList())); } /// @@ -32,4 +45,14 @@ public TopicConfiguration(string name, int partitions, short replicas) /// Gets the Topic Replication Factor /// public short Replicas { get; } + + /// Gets the configuration to use to create the new topic. + public IReadOnlyDictionary Configs { get; } + + /// + /// Gets the mapping from partition id to replica ids (i.e., static broker ids) or null + /// if the number of partitions and replication factor are specified + /// instead. + /// + public IReadOnlyDictionary> ReplicasAssignments { get; } }