diff --git a/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs b/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs
index 2c6968baa..78b92c816 100644
--- a/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs
+++ b/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs
@@ -77,9 +77,11 @@ public interface IClusterConfigurationBuilder
/// The topic name
/// The number of Topic partitions. Default is to use the cluster-defined partitions.
/// The Topic replication factor. Default is to use the cluster-defined replication factor.
+ /// Additional topic creation configuration values.
///
IClusterConfigurationBuilder CreateTopicIfNotExists(
string topicName,
int numberOfPartitions = -1,
- short replicationFactor = -1);
+ short replicationFactor = -1,
+ Dictionary configs = null);
}
diff --git a/src/KafkaFlow/Clusters/ClusterManager.cs b/src/KafkaFlow/Clusters/ClusterManager.cs
index 814088c50..73ba26ba2 100644
--- a/src/KafkaFlow/Clusters/ClusterManager.cs
+++ b/src/KafkaFlow/Clusters/ClusterManager.cs
@@ -122,6 +122,7 @@ public async Task CreateIfNotExistsAsync(IEnumerable configu
Name = topicConfiguration.Name,
ReplicationFactor = topicConfiguration.Replicas,
NumPartitions = topicConfiguration.Partitions,
+ Configs = topicConfiguration.Configs,
})
.ToArray();
diff --git a/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs
index 4867262cf..135a5b1fd 100644
--- a/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs
+++ b/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs
@@ -111,9 +111,10 @@ public IClusterConfigurationBuilder OnStarted(Action handle
public IClusterConfigurationBuilder CreateTopicIfNotExists(
string topicName,
int numberOfPartitions = -1,
- short replicationFactor = -1)
+ short replicationFactor = -1,
+ Dictionary configs = null)
{
- _topicsToCreateIfNotExist.Add(new TopicConfiguration(topicName, numberOfPartitions, replicationFactor));
+ _topicsToCreateIfNotExist.Add(new TopicConfiguration(topicName, numberOfPartitions, replicationFactor, configs));
return this;
}
}
diff --git a/src/KafkaFlow/Configuration/TopicConfiguration.cs b/src/KafkaFlow/Configuration/TopicConfiguration.cs
index f2eac31bb..e22d4a3f0 100644
--- a/src/KafkaFlow/Configuration/TopicConfiguration.cs
+++ b/src/KafkaFlow/Configuration/TopicConfiguration.cs
@@ -1,3 +1,5 @@
+using System.Collections.Generic;
+
namespace KafkaFlow.Configuration;
///
@@ -11,11 +13,13 @@ public class TopicConfiguration
/// The topic name
/// The number of partitions for the topic
/// Replication factor for the topic
- public TopicConfiguration(string name, int partitions, short replicas)
+ /// Additional topic creation configuration values.
+ public TopicConfiguration(string name, int partitions, short replicas, Dictionary configs)
{
this.Name = name;
this.Partitions = partitions;
this.Replicas = replicas;
+ this.Configs = configs;
}
///
@@ -32,4 +36,9 @@ public TopicConfiguration(string name, int partitions, short replicas)
/// Gets the Topic Replication Factor
///
public short Replicas { get; }
+
+ ///
+ /// Gets the topic creation configuration
+ ///
+ public Dictionary Configs { get; }
}