Skip to content

Commit

Permalink
Version 0.8.0 (#90)
Browse files Browse the repository at this point in the history
* Added the Pending consume status, which doesn't ack
* Kafka and gateway changes
* Working Kafka producer without schema registry
  • Loading branch information
alexeyzimarev authored Apr 25, 2022
1 parent 2f0415a commit 3aa97da
Show file tree
Hide file tree
Showing 80 changed files with 1,064 additions and 311 deletions.
64 changes: 43 additions & 21 deletions Eventuous.sln
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Producers", "src\
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Tests.GooglePubSub", "src\GooglePubSub\test\Eventuous.Tests.GooglePubSub\Eventuous.Tests.GooglePubSub.csproj", "{135075B3-AAE5-4C13-87F6-BF3ECF108B37}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Gateway", "src\Shovel\src\Eventuous.Gateway\Eventuous.Gateway.csproj", "{F597091E-E8AD-429B-A8E0-3C7972031F83}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Sut.App", "test\Eventuous.Sut.App\Eventuous.Sut.App.csproj", "{151A0839-2B1F-49D6-B5DD-199A5FAAB610}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Sut.Domain", "test\Eventuous.Sut.Domain\Eventuous.Sut.Domain.csproj", "{09805579-49ED-4B07-B498-047B00DEBCCF}"
Expand Down Expand Up @@ -66,8 +64,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Tests.RabbitMq",
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{C2C31C86-828F-4DBF-8EDA-C312C7BBB54B}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Gateway.Tests", "src\Shovel\test\Eventuous.Gateway.Tests\Eventuous.Gateway.Tests.csproj", "{7DC476A6-9BEA-4F29-BFB2-6BBE10577029}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Tests.Subscriptions", "src\Core\test\Eventuous.Tests.Subscriptions\Eventuous.Tests.Subscriptions.csproj", "{8E74DA60-D1DA-45D1-83C5-2F7262E6D342}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{AD2B23E1-0CD1-44B5-82C3-64CA12841885}"
Expand Down Expand Up @@ -110,8 +106,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Spyglass", "src\E
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.ElasticSearch", "src\Experimental\src\Eventuous.ElasticSearch\Eventuous.ElasticSearch.csproj", "{8B9741E1-EB6A-40C9-B30D-0549A1849B9D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Connectors.Base", "src\Experimental\src\Eventuous.Connectors.Base\Eventuous.Connectors.Base.csproj", "{859AF2D3-3370-412A-A163-425C42C8A04C}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ElasticPlayground", "src\Experimental\src\ElasticPlayground\ElasticPlayground.csproj", "{5555BC0B-418C-49A3-BD68-ADCEBCC518E4}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Tests.AspNetCore", "src\Extensions\test\Eventuous.Tests.AspNetCore\Eventuous.Tests.AspNetCore.csproj", "{152E27CE-35F1-4F65-B53A-C7B710F1B310}"
Expand All @@ -120,6 +114,22 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Tests.AspNetCore.
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Sut.AspNetCore", "src\Extensions\test\Eventuous.Sut.AspNetCore\Eventuous.Sut.AspNetCore.csproj", "{1C1033D6-059B-4CEE-A7D8-9EE470053145}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Kafka", "Kafka", "{6E545DFE-FE70-4486-92E0-E47E86E66210}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{00A23BA2-3D94-4498-BE3A-F44B2B7E5996}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Kafka", "src\Kafka\src\Eventuous.Kafka\Eventuous.Kafka.csproj", "{E05C6B72-71FC-4889-A631-6909CFBFB6F6}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{6D487B3B-5B3E-4656-9E29-0B3FA3834B24}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Tests.Kafka", "src\Kafka\test\Eventuous.Tests.Kafka\Eventuous.Tests.Kafka.csproj", "{12D95816-F924-40A5-AF3C-35FE9626F424}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Gateway", "src\Gateway\src\Eventuous.Gateway\Eventuous.Gateway.csproj", "{E3D5B654-C68D-456B-8535-A9C38D37DFD2}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Gateway.Tests", "src\Gateway\test\Eventuous.Gateway.Tests\Eventuous.Gateway.Tests.csproj", "{F89E7F27-D198-41DD-AC24-79709EF07537}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Connectors.Base", "src\Gateway\src\Eventuous.Connectors.Base\Eventuous.Connectors.Base.csproj", "{93E90DD3-AFFD-4750-BDE5-47DB0E257761}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -158,10 +168,6 @@ Global
{135075B3-AAE5-4C13-87F6-BF3ECF108B37}.Debug|Any CPU.Build.0 = Debug|Any CPU
{135075B3-AAE5-4C13-87F6-BF3ECF108B37}.Release|Any CPU.ActiveCfg = Release|Any CPU
{135075B3-AAE5-4C13-87F6-BF3ECF108B37}.Release|Any CPU.Build.0 = Release|Any CPU
{F597091E-E8AD-429B-A8E0-3C7972031F83}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F597091E-E8AD-429B-A8E0-3C7972031F83}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F597091E-E8AD-429B-A8E0-3C7972031F83}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F597091E-E8AD-429B-A8E0-3C7972031F83}.Release|Any CPU.Build.0 = Release|Any CPU
{151A0839-2B1F-49D6-B5DD-199A5FAAB610}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{151A0839-2B1F-49D6-B5DD-199A5FAAB610}.Debug|Any CPU.Build.0 = Debug|Any CPU
{151A0839-2B1F-49D6-B5DD-199A5FAAB610}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down Expand Up @@ -202,10 +208,6 @@ Global
{0BE1EE5E-3E12-48C4-8510-9A08C6E0872D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0BE1EE5E-3E12-48C4-8510-9A08C6E0872D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{0BE1EE5E-3E12-48C4-8510-9A08C6E0872D}.Release|Any CPU.Build.0 = Release|Any CPU
{7DC476A6-9BEA-4F29-BFB2-6BBE10577029}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7DC476A6-9BEA-4F29-BFB2-6BBE10577029}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7DC476A6-9BEA-4F29-BFB2-6BBE10577029}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7DC476A6-9BEA-4F29-BFB2-6BBE10577029}.Release|Any CPU.Build.0 = Release|Any CPU
{8E74DA60-D1DA-45D1-83C5-2F7262E6D342}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{8E74DA60-D1DA-45D1-83C5-2F7262E6D342}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8E74DA60-D1DA-45D1-83C5-2F7262E6D342}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down Expand Up @@ -254,10 +256,6 @@ Global
{8B9741E1-EB6A-40C9-B30D-0549A1849B9D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8B9741E1-EB6A-40C9-B30D-0549A1849B9D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8B9741E1-EB6A-40C9-B30D-0549A1849B9D}.Release|Any CPU.Build.0 = Release|Any CPU
{859AF2D3-3370-412A-A163-425C42C8A04C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{859AF2D3-3370-412A-A163-425C42C8A04C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{859AF2D3-3370-412A-A163-425C42C8A04C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{859AF2D3-3370-412A-A163-425C42C8A04C}.Release|Any CPU.Build.0 = Release|Any CPU
{5555BC0B-418C-49A3-BD68-ADCEBCC518E4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5555BC0B-418C-49A3-BD68-ADCEBCC518E4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5555BC0B-418C-49A3-BD68-ADCEBCC518E4}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand All @@ -274,6 +272,26 @@ Global
{1C1033D6-059B-4CEE-A7D8-9EE470053145}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1C1033D6-059B-4CEE-A7D8-9EE470053145}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1C1033D6-059B-4CEE-A7D8-9EE470053145}.Release|Any CPU.Build.0 = Release|Any CPU
{E05C6B72-71FC-4889-A631-6909CFBFB6F6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E05C6B72-71FC-4889-A631-6909CFBFB6F6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E05C6B72-71FC-4889-A631-6909CFBFB6F6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E05C6B72-71FC-4889-A631-6909CFBFB6F6}.Release|Any CPU.Build.0 = Release|Any CPU
{12D95816-F924-40A5-AF3C-35FE9626F424}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{12D95816-F924-40A5-AF3C-35FE9626F424}.Debug|Any CPU.Build.0 = Debug|Any CPU
{12D95816-F924-40A5-AF3C-35FE9626F424}.Release|Any CPU.ActiveCfg = Release|Any CPU
{12D95816-F924-40A5-AF3C-35FE9626F424}.Release|Any CPU.Build.0 = Release|Any CPU
{E3D5B654-C68D-456B-8535-A9C38D37DFD2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E3D5B654-C68D-456B-8535-A9C38D37DFD2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E3D5B654-C68D-456B-8535-A9C38D37DFD2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E3D5B654-C68D-456B-8535-A9C38D37DFD2}.Release|Any CPU.Build.0 = Release|Any CPU
{F89E7F27-D198-41DD-AC24-79709EF07537}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F89E7F27-D198-41DD-AC24-79709EF07537}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F89E7F27-D198-41DD-AC24-79709EF07537}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F89E7F27-D198-41DD-AC24-79709EF07537}.Release|Any CPU.Build.0 = Release|Any CPU
{93E90DD3-AFFD-4750-BDE5-47DB0E257761}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{93E90DD3-AFFD-4750-BDE5-47DB0E257761}.Debug|Any CPU.Build.0 = Debug|Any CPU
{93E90DD3-AFFD-4750-BDE5-47DB0E257761}.Release|Any CPU.ActiveCfg = Release|Any CPU
{93E90DD3-AFFD-4750-BDE5-47DB0E257761}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{151A0839-2B1F-49D6-B5DD-199A5FAAB610} = {C60C6094-2A03-45B6-AB33-C514C35DF823}
Expand All @@ -292,7 +310,6 @@ Global
{BCD7A6A8-C3BF-4ADA-8DA5-243B9053A839} = {4C265245-A432-4361-88B2-9A590A834527}
{0649460A-DE39-4AD4-996A-7104E28C68FB} = {BCD7A6A8-C3BF-4ADA-8DA5-243B9053A839}
{AA3D3779-C490-47DE-857D-1CD8A9F6A354} = {55396859-0971-4980-9B7F-59A1A5BD2B29}
{F597091E-E8AD-429B-A8E0-3C7972031F83} = {AA3D3779-C490-47DE-857D-1CD8A9F6A354}
{A6762149-8450-468B-A28F-1D3F6E586247} = {2E59C5F8-3E5A-4450-B902-7648AD7ECC0F}
{B07B30E3-0028-4068-AAF4-9E20DD35BA8D} = {A6762149-8450-468B-A28F-1D3F6E586247}
{FF3040AA-E5FD-4A34-8E3B-70BAED38A988} = {2E59C5F8-3E5A-4450-B902-7648AD7ECC0F}
Expand All @@ -302,7 +319,6 @@ Global
{487F21BD-1B70-4BDD-B970-4A9DFBA99A65} = {3F40E079-C493-47B1-B20A-9272732D6380}
{0BE1EE5E-3E12-48C4-8510-9A08C6E0872D} = {39A42904-D23D-4A2A-96E4-D762B7C83F78}
{C2C31C86-828F-4DBF-8EDA-C312C7BBB54B} = {55396859-0971-4980-9B7F-59A1A5BD2B29}
{7DC476A6-9BEA-4F29-BFB2-6BBE10577029} = {C2C31C86-828F-4DBF-8EDA-C312C7BBB54B}
{8E74DA60-D1DA-45D1-83C5-2F7262E6D342} = {0ED6785B-60EF-46B4-B938-EF04189FC8BC}
{AD2B23E1-0CD1-44B5-82C3-64CA12841885} = {4C265245-A432-4361-88B2-9A590A834527}
{A627C63B-8F19-4434-AC3E-D8AA012CB04A} = {8A48774A-3D8A-412B-AA4C-6FFB878E3053}
Expand All @@ -321,10 +337,16 @@ Global
{0E2520E7-B4A6-47E7-AED8-662C88441A84} = {B64D59D5-7935-4DFC-8D90-D6EF3D6F2ABA}
{F63DCF76-908F-4F4C-B8A7-4CA4EC34F6C9} = {0E2520E7-B4A6-47E7-AED8-662C88441A84}
{8B9741E1-EB6A-40C9-B30D-0549A1849B9D} = {0E2520E7-B4A6-47E7-AED8-662C88441A84}
{859AF2D3-3370-412A-A163-425C42C8A04C} = {0E2520E7-B4A6-47E7-AED8-662C88441A84}
{5555BC0B-418C-49A3-BD68-ADCEBCC518E4} = {0E2520E7-B4A6-47E7-AED8-662C88441A84}
{152E27CE-35F1-4F65-B53A-C7B710F1B310} = {CCD807D2-F4F2-4EC2-A03D-8943F73993A6}
{B3F782EE-FBEF-47E2-8379-8A91B11363B8} = {CCD807D2-F4F2-4EC2-A03D-8943F73993A6}
{1C1033D6-059B-4CEE-A7D8-9EE470053145} = {CCD807D2-F4F2-4EC2-A03D-8943F73993A6}
{00A23BA2-3D94-4498-BE3A-F44B2B7E5996} = {6E545DFE-FE70-4486-92E0-E47E86E66210}
{E05C6B72-71FC-4889-A631-6909CFBFB6F6} = {00A23BA2-3D94-4498-BE3A-F44B2B7E5996}
{6D487B3B-5B3E-4656-9E29-0B3FA3834B24} = {6E545DFE-FE70-4486-92E0-E47E86E66210}
{12D95816-F924-40A5-AF3C-35FE9626F424} = {6D487B3B-5B3E-4656-9E29-0B3FA3834B24}
{E3D5B654-C68D-456B-8535-A9C38D37DFD2} = {AA3D3779-C490-47DE-857D-1CD8A9F6A354}
{F89E7F27-D198-41DD-AC24-79709EF07537} = {C2C31C86-828F-4DBF-8EDA-C312C7BBB54B}
{93E90DD3-AFFD-4750-BDE5-47DB0E257761} = {AA3D3779-C490-47DE-857D-1CD8A9F6A354}
EndGlobalSection
EndGlobal
38 changes: 38 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,44 @@ services:
- '25672:25672'
- '15672:15672'

zookeeper:
image: confluentinc/cp-zookeeper:7.0.1
container_name: eventuous-zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: confluentinc/cp-kafka:7.0.1
container_name: eventuous-kafka
ports:
# To learn about configuring Kafka for access across networks see
# https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

kafka-schema-registry:
image: confluentinc/cp-schema-registry:7.0.1
container_name: eventuous-kafka-schema-registry
depends_on:
- zookeeper
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: kafka-schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka1:19092,PLAINTEXT://kafka2:19093,PLAINTEXT://kafka3:19094'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

networks:
default:
name: eventuous-network
5 changes: 5 additions & 0 deletions src/Core/src/Eventuous.Producers/AckProduce.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
namespace Eventuous.Producers;

public delegate ValueTask AcknowledgeProduce(ProducedMessage message);

public delegate ValueTask ReportFailedProduce(ProducedMessage message, string error, Exception? exception);
2 changes: 1 addition & 1 deletion src/Core/src/Eventuous.Producers/BaseProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,4 @@ public async Task Produce(
public bool Ready { get; private set; }

protected void ReadyNow() => Ready = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,9 @@ public static (Activity? act, IEnumerable<ProducedMessage> msgs) Start(
return (activity, messages.Select(GetMessage));

ProducedMessage GetMessage(ProducedMessage message)
=> new(
message.Message,
GetMeta(message.Metadata).AddActivityTags(activity),
message.MessageId
);
=> message with {
Metadata = GetMeta(message.Metadata).AddActivityTags(activity),
};
}

static Activity? GetActivity(IEnumerable<KeyValuePair<string, object?>>? tags)
Expand Down
12 changes: 7 additions & 5 deletions src/Core/src/Eventuous.Producers/ProducedMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ public ProducedMessage(object message, Metadata? metadata, Guid? messageId = nul
MessageType = TypeMap.GetTypeName(message);
}

public object Message { get; }
public Metadata? Metadata { get; init; }
public Guid MessageId { get; }
public string MessageType { get; }
}
public object Message { get; }
public Metadata? Metadata { get; init; }
public Guid MessageId { get; }
public string MessageType { get; }
public AcknowledgeProduce? OnAck { get; init; }
public ReportFailedProduce? OnNack { get; init; }
}
35 changes: 21 additions & 14 deletions src/Core/src/Eventuous.Producers/ProducerExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,22 @@ public static class ProducerExtensions {
/// <param name="stream">Stream name where the message should be produced</param>
/// <param name="message">Message to produce</param>
/// <param name="metadata"></param>
/// <param name="onAck">Function to confirm that the message was produced</param>
/// <param name="cancellationToken"></param>
/// <typeparam name="TMessage">Message typ</typeparam>
/// <returns></returns>
public static Task Produce<TMessage>(
this IEventProducer producer,
StreamName stream,
TMessage message,
Metadata? metadata = null,
Metadata? metadata,
AcknowledgeProduce? onAck = null,
CancellationToken cancellationToken = default
) where TMessage : class {
var producedMessages =
message is IEnumerable<object> collection
? ConvertMany(collection, metadata)
: ConvertOne(message, metadata);
? ConvertMany(collection, metadata, onAck)
: ConvertOne(message, metadata, onAck);

return producer.Produce(stream, producedMessages, cancellationToken);
}
Expand All @@ -37,6 +39,7 @@ message is IEnumerable<object> collection
/// <param name="message">Message to produce</param>
/// <param name="metadata">Message metadata</param>
/// <param name="options">Produce options</param>
/// <param name="onAck">Function to confirm that the message was produced</param>
/// <param name="cancellationToken"></param>
/// <typeparam name="TMessage">Message type</typeparam>
/// <typeparam name="TProduceOptions"></typeparam>
Expand All @@ -45,22 +48,26 @@ public static Task Produce<TProduceOptions, TMessage>(
this IEventProducer<TProduceOptions> producer,
StreamName stream,
TMessage message,
Metadata? metadata = null,
TProduceOptions? options = null,
Metadata? metadata,
TProduceOptions options,
AcknowledgeProduce? onAck = null,
CancellationToken cancellationToken = default
)
where TMessage : class where TProduceOptions : class {
) where TMessage : class where TProduceOptions : class {
var producedMessages =
Ensure.NotNull(message) is IEnumerable<object> collection
? ConvertMany(collection, metadata)
: ConvertOne(message, metadata);
? ConvertMany(collection, metadata, onAck)
: ConvertOne(message, metadata, onAck);

return producer.Produce(stream, producedMessages, options, cancellationToken);
}

static IEnumerable<ProducedMessage> ConvertMany(IEnumerable<object> messages, Metadata? metadata)
=> messages.Select(x => new ProducedMessage(x, metadata));
static IEnumerable<ProducedMessage> ConvertMany(
IEnumerable<object> messages,
Metadata? metadata,
AcknowledgeProduce? onAck
)
=> messages.Select(x => new ProducedMessage(x, metadata) { OnAck = onAck });

static IEnumerable<ProducedMessage> ConvertOne(object message, Metadata? metadata)
=> new[] { new ProducedMessage(message, metadata) };
}
static IEnumerable<ProducedMessage> ConvertOne(object message, Metadata? metadata, AcknowledgeProduce? onAck)
=> new[] { new ProducedMessage(message, metadata) { OnAck = onAck } };
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,4 @@ public ContextItems AddItem(string key, object? value) {
public static class ContextKeys {
public const string GlobalPosition = nameof(GlobalPosition);
public const string StreamPosition = nameof(StreamPosition);
public const string PartitionId = nameof(PartitionId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,7 @@ public DelayedAckConsumeContext(IMessageConsumeContext inner, Acknowledge acknow
/// <param name="exception">Exception that occurred during message processing</param>
/// <returns></returns>
public ValueTask Fail(Exception exception) => _fail(this, exception);
}

public string? PartitionKey { get; internal set; }
public long PartitionId { get; internal set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ static async ValueTask DelayedConsume(WorkerTask workerTask, CancellationToken c
}
}

await ctx.Acknowledge().NoContext();
if (!ctx.HandlingResults.IsPending())
await ctx.Acknowledge().NoContext();
}
catch (TaskCanceledException) {
ctx.Ignore<ConcurrentFilter>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
namespace Eventuous.Subscriptions.Filters.Partitioning;

public static class Partitioner {
public delegate uint GetPartitionHash(IMessageConsumeContext context);
public delegate uint GetPartitionHash(string partitionKey);

public delegate string GetPartitionKey(IMessageConsumeContext context);
}
Loading

0 comments on commit 3aa97da

Please sign in to comment.