Skip to content

Commit

Permalink
feat: create BatchProduceAsync method
Browse files Browse the repository at this point in the history
  • Loading branch information
filipeesch committed Feb 2, 2021
1 parent 95a2f8b commit 7368169
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 7 deletions.
19 changes: 12 additions & 7 deletions samples/KafkaFlow.Sample/Program.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace KafkaFlow.Sample
{
using System;
using System.Linq;
using System.Threading.Tasks;
using global::Microsoft.Extensions.DependencyInjection;
using KafkaFlow.Admin;
Expand Down Expand Up @@ -81,13 +82,17 @@ private static async Task Main()
switch (input)
{
case var _ when int.TryParse(input, out var count):
for (var i = 0; i < count; i++)
{
producers[producerName]
.Produce(
Guid.NewGuid().ToString(),
new TestMessage { Text = $"Message: {Guid.NewGuid()}" });
}
var result = await producers[producerName]
.BatchProduceAsync(
Enumerable
.Range(0, count)
.Select(
x => new BatchProduceItem(
"test-topic",
Guid.NewGuid().ToString(),
new TestMessage { Text = $"Message: {Guid.NewGuid()}" },
null))
.ToList());

break;

Expand Down
130 changes: 130 additions & 0 deletions src/KafkaFlow/Producers/BatchProduceExtension.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
namespace KafkaFlow.Producers
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;

/// <summary>
/// </summary>
public static class BatchProduceExtension
{
/// <summary>
/// Calls the Produce() method in loop for high throughput scenarios
/// </summary>
/// <param name="producer"></param>
/// <param name="items">All messages to produce</param>
/// <param name="throwIfAnyProduceFail">indicates if the method should throw a <see cref="BatchProduceException"/> if any message fail</param>
/// <returns>A Task that will be marked as completed when all produce operations end</returns>
public static Task<IReadOnlyCollection<BatchProduceItem>> BatchProduceAsync(
this IMessageProducer producer,
IReadOnlyCollection<BatchProduceItem> items,
bool throwIfAnyProduceFail = true)
{
var completionSource = new TaskCompletionSource<IReadOnlyCollection<BatchProduceItem>>();

var pendingProduceCount = items.Count;
var hasErrors = false;

if (pendingProduceCount == 0)
{
completionSource.SetResult(items);
}

foreach (var item in items)
{
producer.Produce(
item.Topic,
item.PartitionKey,
item.Message,
item.Headers,
report =>
{
item.DeliveryReport = report;
if (report.Error.IsError)
{
hasErrors = true;
}
if (Interlocked.Decrement(ref pendingProduceCount) != 0)
{
return;
}
if (throwIfAnyProduceFail && hasErrors)
{
completionSource.SetException(new BatchProduceException(items));
}
else
{
completionSource.SetResult(items);
}
});
}

return completionSource.Task;
}
}

/// <summary>
/// Represents a message to be produced in batch
/// </summary>
public class BatchProduceItem
{
/// <summary>
/// The message topic name
/// </summary>
public string Topic { get; }

/// <summary>
/// The message partition key
/// </summary>
public string PartitionKey { get; }

/// <summary>
/// The message object
/// </summary>
public object Message { get; }

/// <summary>
/// The message headers
/// </summary>
public IMessageHeaders Headers { get; }

/// <summary>
/// The delivery report after the production
/// </summary>
public DeliveryReport<byte[], byte[]> DeliveryReport { get; internal set; }

/// <summary>
/// Creates a batch produce item
/// </summary>
/// <param name="topic"></param>
/// <param name="partitionKey"></param>
/// <param name="message"></param>
/// <param name="headers"></param>
public BatchProduceItem(
string topic,
string partitionKey,
object message,
IMessageHeaders headers)
{
this.Topic = topic;
this.PartitionKey = partitionKey;
this.Message = message;
this.Headers = headers;
}
}

public class BatchProduceException : Exception
{
public IReadOnlyCollection<BatchProduceItem> Items { get; }

public BatchProduceException(IReadOnlyCollection<BatchProduceItem> items)
{
this.Items = items;
}
}
}

0 comments on commit 7368169

Please sign in to comment.