Skip to content

Commit

Permalink
feat: create topic with additional configuration if present
Browse files Browse the repository at this point in the history
  • Loading branch information
robertcoltheart committed Nov 6, 2024
1 parent e121a5b commit f34011b
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,11 @@ public interface IClusterConfigurationBuilder
/// <param name="topicName">The topic name</param>
/// <param name="numberOfPartitions">The number of Topic partitions. Default is to use the cluster-defined partitions.</param>
/// <param name="replicationFactor">The Topic replication factor. Default is to use the cluster-defined replication factor.</param>
/// <param name="configs">Additional topic creation configuration values.</param>
/// <returns></returns>
IClusterConfigurationBuilder CreateTopicIfNotExists(
string topicName,
int numberOfPartitions = -1,
short replicationFactor = -1);
short replicationFactor = -1,
Dictionary<string, string> configs = null);
}
1 change: 1 addition & 0 deletions src/KafkaFlow/Clusters/ClusterManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public async Task CreateIfNotExistsAsync(IEnumerable<TopicConfiguration> configu
Name = topicConfiguration.Name,
ReplicationFactor = topicConfiguration.Replicas,
NumPartitions = topicConfiguration.Partitions,
Configs = topicConfiguration.Configs,
})
.ToArray();

Expand Down
5 changes: 3 additions & 2 deletions src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,10 @@ public IClusterConfigurationBuilder OnStarted(Action<IDependencyResolver> handle
public IClusterConfigurationBuilder CreateTopicIfNotExists(
string topicName,
int numberOfPartitions = -1,
short replicationFactor = -1)
short replicationFactor = -1,
Dictionary<string, string> configs = null)
{
_topicsToCreateIfNotExist.Add(new TopicConfiguration(topicName, numberOfPartitions, replicationFactor));
_topicsToCreateIfNotExist.Add(new TopicConfiguration(topicName, numberOfPartitions, replicationFactor, configs));
return this;
}
}
11 changes: 10 additions & 1 deletion src/KafkaFlow/Configuration/TopicConfiguration.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using System.Collections.Generic;

namespace KafkaFlow.Configuration;

/// <summary>
Expand All @@ -11,11 +13,13 @@ public class TopicConfiguration
/// <param name="name">The topic name</param>
/// <param name="partitions">The number of partitions for the topic</param>
/// <param name="replicas">Replication factor for the topic</param>
public TopicConfiguration(string name, int partitions, short replicas)
/// <param name="configs">Additional topic creation configuration values.</param>
public TopicConfiguration(string name, int partitions, short replicas, Dictionary<string, string> configs)
{
this.Name = name;
this.Partitions = partitions;
this.Replicas = replicas;
this.Configs = configs;
}

/// <summary>
Expand All @@ -32,4 +36,9 @@ public TopicConfiguration(string name, int partitions, short replicas)
/// Gets the Topic Replication Factor
/// </summary>
public short Replicas { get; }

/// <summary>
/// Gets the topic creation configuration
/// </summary>
public Dictionary<string, string> Configs { get; }
}

0 comments on commit f34011b

Please sign in to comment.