diff --git a/Eventuous.sln b/Eventuous.sln index 3bb3781a..c48f02d5 100644 --- a/Eventuous.sln +++ b/Eventuous.sln @@ -14,8 +14,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Producers", "src\ EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Tests.GooglePubSub", "src\GooglePubSub\test\Eventuous.Tests.GooglePubSub\Eventuous.Tests.GooglePubSub.csproj", "{135075B3-AAE5-4C13-87F6-BF3ECF108B37}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Gateway", "src\Shovel\src\Eventuous.Gateway\Eventuous.Gateway.csproj", "{F597091E-E8AD-429B-A8E0-3C7972031F83}" -EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Sut.App", "test\Eventuous.Sut.App\Eventuous.Sut.App.csproj", "{151A0839-2B1F-49D6-B5DD-199A5FAAB610}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Sut.Domain", "test\Eventuous.Sut.Domain\Eventuous.Sut.Domain.csproj", "{09805579-49ED-4B07-B498-047B00DEBCCF}" @@ -66,8 +64,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Tests.RabbitMq", EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{C2C31C86-828F-4DBF-8EDA-C312C7BBB54B}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Gateway.Tests", "src\Shovel\test\Eventuous.Gateway.Tests\Eventuous.Gateway.Tests.csproj", "{7DC476A6-9BEA-4F29-BFB2-6BBE10577029}" -EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Tests.Subscriptions", "src\Core\test\Eventuous.Tests.Subscriptions\Eventuous.Tests.Subscriptions.csproj", "{8E74DA60-D1DA-45D1-83C5-2F7262E6D342}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{AD2B23E1-0CD1-44B5-82C3-64CA12841885}" @@ -110,8 +106,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Spyglass", "src\E EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.ElasticSearch", "src\Experimental\src\Eventuous.ElasticSearch\Eventuous.ElasticSearch.csproj", "{8B9741E1-EB6A-40C9-B30D-0549A1849B9D}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Connectors.Base", "src\Experimental\src\Eventuous.Connectors.Base\Eventuous.Connectors.Base.csproj", "{859AF2D3-3370-412A-A163-425C42C8A04C}" -EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ElasticPlayground", "src\Experimental\src\ElasticPlayground\ElasticPlayground.csproj", "{5555BC0B-418C-49A3-BD68-ADCEBCC518E4}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Tests.AspNetCore", "src\Extensions\test\Eventuous.Tests.AspNetCore\Eventuous.Tests.AspNetCore.csproj", "{152E27CE-35F1-4F65-B53A-C7B710F1B310}" @@ -120,6 +114,22 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Tests.AspNetCore. EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Sut.AspNetCore", "src\Extensions\test\Eventuous.Sut.AspNetCore\Eventuous.Sut.AspNetCore.csproj", "{1C1033D6-059B-4CEE-A7D8-9EE470053145}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Kafka", "Kafka", "{6E545DFE-FE70-4486-92E0-E47E86E66210}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{00A23BA2-3D94-4498-BE3A-F44B2B7E5996}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Kafka", "src\Kafka\src\Eventuous.Kafka\Eventuous.Kafka.csproj", "{E05C6B72-71FC-4889-A631-6909CFBFB6F6}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{6D487B3B-5B3E-4656-9E29-0B3FA3834B24}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Tests.Kafka", "src\Kafka\test\Eventuous.Tests.Kafka\Eventuous.Tests.Kafka.csproj", "{12D95816-F924-40A5-AF3C-35FE9626F424}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Gateway", "src\Gateway\src\Eventuous.Gateway\Eventuous.Gateway.csproj", "{E3D5B654-C68D-456B-8535-A9C38D37DFD2}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Gateway.Tests", "src\Gateway\test\Eventuous.Gateway.Tests\Eventuous.Gateway.Tests.csproj", "{F89E7F27-D198-41DD-AC24-79709EF07537}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Connectors.Base", "src\Gateway\src\Eventuous.Connectors.Base\Eventuous.Connectors.Base.csproj", "{93E90DD3-AFFD-4750-BDE5-47DB0E257761}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -158,10 +168,6 @@ Global {135075B3-AAE5-4C13-87F6-BF3ECF108B37}.Debug|Any CPU.Build.0 = Debug|Any CPU {135075B3-AAE5-4C13-87F6-BF3ECF108B37}.Release|Any CPU.ActiveCfg = Release|Any CPU {135075B3-AAE5-4C13-87F6-BF3ECF108B37}.Release|Any CPU.Build.0 = Release|Any CPU - {F597091E-E8AD-429B-A8E0-3C7972031F83}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {F597091E-E8AD-429B-A8E0-3C7972031F83}.Debug|Any CPU.Build.0 = Debug|Any CPU - {F597091E-E8AD-429B-A8E0-3C7972031F83}.Release|Any CPU.ActiveCfg = Release|Any CPU - {F597091E-E8AD-429B-A8E0-3C7972031F83}.Release|Any CPU.Build.0 = Release|Any CPU {151A0839-2B1F-49D6-B5DD-199A5FAAB610}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {151A0839-2B1F-49D6-B5DD-199A5FAAB610}.Debug|Any CPU.Build.0 = Debug|Any CPU {151A0839-2B1F-49D6-B5DD-199A5FAAB610}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -202,10 +208,6 @@ Global {0BE1EE5E-3E12-48C4-8510-9A08C6E0872D}.Debug|Any CPU.Build.0 = Debug|Any CPU {0BE1EE5E-3E12-48C4-8510-9A08C6E0872D}.Release|Any CPU.ActiveCfg = Release|Any CPU {0BE1EE5E-3E12-48C4-8510-9A08C6E0872D}.Release|Any CPU.Build.0 = Release|Any CPU - {7DC476A6-9BEA-4F29-BFB2-6BBE10577029}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {7DC476A6-9BEA-4F29-BFB2-6BBE10577029}.Debug|Any CPU.Build.0 = Debug|Any CPU - {7DC476A6-9BEA-4F29-BFB2-6BBE10577029}.Release|Any CPU.ActiveCfg = Release|Any CPU - {7DC476A6-9BEA-4F29-BFB2-6BBE10577029}.Release|Any CPU.Build.0 = Release|Any CPU {8E74DA60-D1DA-45D1-83C5-2F7262E6D342}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {8E74DA60-D1DA-45D1-83C5-2F7262E6D342}.Debug|Any CPU.Build.0 = Debug|Any CPU {8E74DA60-D1DA-45D1-83C5-2F7262E6D342}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -254,10 +256,6 @@ Global {8B9741E1-EB6A-40C9-B30D-0549A1849B9D}.Debug|Any CPU.Build.0 = Debug|Any CPU {8B9741E1-EB6A-40C9-B30D-0549A1849B9D}.Release|Any CPU.ActiveCfg = Release|Any CPU {8B9741E1-EB6A-40C9-B30D-0549A1849B9D}.Release|Any CPU.Build.0 = Release|Any CPU - {859AF2D3-3370-412A-A163-425C42C8A04C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {859AF2D3-3370-412A-A163-425C42C8A04C}.Debug|Any CPU.Build.0 = Debug|Any CPU - {859AF2D3-3370-412A-A163-425C42C8A04C}.Release|Any CPU.ActiveCfg = Release|Any CPU - {859AF2D3-3370-412A-A163-425C42C8A04C}.Release|Any CPU.Build.0 = Release|Any CPU {5555BC0B-418C-49A3-BD68-ADCEBCC518E4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {5555BC0B-418C-49A3-BD68-ADCEBCC518E4}.Debug|Any CPU.Build.0 = Debug|Any CPU {5555BC0B-418C-49A3-BD68-ADCEBCC518E4}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -274,6 +272,26 @@ Global {1C1033D6-059B-4CEE-A7D8-9EE470053145}.Debug|Any CPU.Build.0 = Debug|Any CPU {1C1033D6-059B-4CEE-A7D8-9EE470053145}.Release|Any CPU.ActiveCfg = Release|Any CPU {1C1033D6-059B-4CEE-A7D8-9EE470053145}.Release|Any CPU.Build.0 = Release|Any CPU + {E05C6B72-71FC-4889-A631-6909CFBFB6F6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {E05C6B72-71FC-4889-A631-6909CFBFB6F6}.Debug|Any CPU.Build.0 = Debug|Any CPU + {E05C6B72-71FC-4889-A631-6909CFBFB6F6}.Release|Any CPU.ActiveCfg = Release|Any CPU + {E05C6B72-71FC-4889-A631-6909CFBFB6F6}.Release|Any CPU.Build.0 = Release|Any CPU + {12D95816-F924-40A5-AF3C-35FE9626F424}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {12D95816-F924-40A5-AF3C-35FE9626F424}.Debug|Any CPU.Build.0 = Debug|Any CPU + {12D95816-F924-40A5-AF3C-35FE9626F424}.Release|Any CPU.ActiveCfg = Release|Any CPU + {12D95816-F924-40A5-AF3C-35FE9626F424}.Release|Any CPU.Build.0 = Release|Any CPU + {E3D5B654-C68D-456B-8535-A9C38D37DFD2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {E3D5B654-C68D-456B-8535-A9C38D37DFD2}.Debug|Any CPU.Build.0 = Debug|Any CPU + {E3D5B654-C68D-456B-8535-A9C38D37DFD2}.Release|Any CPU.ActiveCfg = Release|Any CPU + {E3D5B654-C68D-456B-8535-A9C38D37DFD2}.Release|Any CPU.Build.0 = Release|Any CPU + {F89E7F27-D198-41DD-AC24-79709EF07537}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {F89E7F27-D198-41DD-AC24-79709EF07537}.Debug|Any CPU.Build.0 = Debug|Any CPU + {F89E7F27-D198-41DD-AC24-79709EF07537}.Release|Any CPU.ActiveCfg = Release|Any CPU + {F89E7F27-D198-41DD-AC24-79709EF07537}.Release|Any CPU.Build.0 = Release|Any CPU + {93E90DD3-AFFD-4750-BDE5-47DB0E257761}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {93E90DD3-AFFD-4750-BDE5-47DB0E257761}.Debug|Any CPU.Build.0 = Debug|Any CPU + {93E90DD3-AFFD-4750-BDE5-47DB0E257761}.Release|Any CPU.ActiveCfg = Release|Any CPU + {93E90DD3-AFFD-4750-BDE5-47DB0E257761}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(NestedProjects) = preSolution {151A0839-2B1F-49D6-B5DD-199A5FAAB610} = {C60C6094-2A03-45B6-AB33-C514C35DF823} @@ -292,7 +310,6 @@ Global {BCD7A6A8-C3BF-4ADA-8DA5-243B9053A839} = {4C265245-A432-4361-88B2-9A590A834527} {0649460A-DE39-4AD4-996A-7104E28C68FB} = {BCD7A6A8-C3BF-4ADA-8DA5-243B9053A839} {AA3D3779-C490-47DE-857D-1CD8A9F6A354} = {55396859-0971-4980-9B7F-59A1A5BD2B29} - {F597091E-E8AD-429B-A8E0-3C7972031F83} = {AA3D3779-C490-47DE-857D-1CD8A9F6A354} {A6762149-8450-468B-A28F-1D3F6E586247} = {2E59C5F8-3E5A-4450-B902-7648AD7ECC0F} {B07B30E3-0028-4068-AAF4-9E20DD35BA8D} = {A6762149-8450-468B-A28F-1D3F6E586247} {FF3040AA-E5FD-4A34-8E3B-70BAED38A988} = {2E59C5F8-3E5A-4450-B902-7648AD7ECC0F} @@ -302,7 +319,6 @@ Global {487F21BD-1B70-4BDD-B970-4A9DFBA99A65} = {3F40E079-C493-47B1-B20A-9272732D6380} {0BE1EE5E-3E12-48C4-8510-9A08C6E0872D} = {39A42904-D23D-4A2A-96E4-D762B7C83F78} {C2C31C86-828F-4DBF-8EDA-C312C7BBB54B} = {55396859-0971-4980-9B7F-59A1A5BD2B29} - {7DC476A6-9BEA-4F29-BFB2-6BBE10577029} = {C2C31C86-828F-4DBF-8EDA-C312C7BBB54B} {8E74DA60-D1DA-45D1-83C5-2F7262E6D342} = {0ED6785B-60EF-46B4-B938-EF04189FC8BC} {AD2B23E1-0CD1-44B5-82C3-64CA12841885} = {4C265245-A432-4361-88B2-9A590A834527} {A627C63B-8F19-4434-AC3E-D8AA012CB04A} = {8A48774A-3D8A-412B-AA4C-6FFB878E3053} @@ -321,10 +337,16 @@ Global {0E2520E7-B4A6-47E7-AED8-662C88441A84} = {B64D59D5-7935-4DFC-8D90-D6EF3D6F2ABA} {F63DCF76-908F-4F4C-B8A7-4CA4EC34F6C9} = {0E2520E7-B4A6-47E7-AED8-662C88441A84} {8B9741E1-EB6A-40C9-B30D-0549A1849B9D} = {0E2520E7-B4A6-47E7-AED8-662C88441A84} - {859AF2D3-3370-412A-A163-425C42C8A04C} = {0E2520E7-B4A6-47E7-AED8-662C88441A84} {5555BC0B-418C-49A3-BD68-ADCEBCC518E4} = {0E2520E7-B4A6-47E7-AED8-662C88441A84} {152E27CE-35F1-4F65-B53A-C7B710F1B310} = {CCD807D2-F4F2-4EC2-A03D-8943F73993A6} {B3F782EE-FBEF-47E2-8379-8A91B11363B8} = {CCD807D2-F4F2-4EC2-A03D-8943F73993A6} {1C1033D6-059B-4CEE-A7D8-9EE470053145} = {CCD807D2-F4F2-4EC2-A03D-8943F73993A6} + {00A23BA2-3D94-4498-BE3A-F44B2B7E5996} = {6E545DFE-FE70-4486-92E0-E47E86E66210} + {E05C6B72-71FC-4889-A631-6909CFBFB6F6} = {00A23BA2-3D94-4498-BE3A-F44B2B7E5996} + {6D487B3B-5B3E-4656-9E29-0B3FA3834B24} = {6E545DFE-FE70-4486-92E0-E47E86E66210} + {12D95816-F924-40A5-AF3C-35FE9626F424} = {6D487B3B-5B3E-4656-9E29-0B3FA3834B24} + {E3D5B654-C68D-456B-8535-A9C38D37DFD2} = {AA3D3779-C490-47DE-857D-1CD8A9F6A354} + {F89E7F27-D198-41DD-AC24-79709EF07537} = {C2C31C86-828F-4DBF-8EDA-C312C7BBB54B} + {93E90DD3-AFFD-4750-BDE5-47DB0E257761} = {AA3D3779-C490-47DE-857D-1CD8A9F6A354} EndGlobalSection EndGlobal diff --git a/docker-compose.yml b/docker-compose.yml index b4b8c99a..8fa5c671 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -37,6 +37,44 @@ services: - '25672:25672' - '15672:15672' + zookeeper: + image: confluentinc/cp-zookeeper:7.0.1 + container_name: eventuous-zookeeper + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + kafka: + image: confluentinc/cp-kafka:7.0.1 + container_name: eventuous-kafka + ports: + # To learn about configuring Kafka for access across networks see + # https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/ + - "9092:9092" + depends_on: + - zookeeper + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + + kafka-schema-registry: + image: confluentinc/cp-schema-registry:7.0.1 + container_name: eventuous-kafka-schema-registry + depends_on: + - zookeeper + - kafka + ports: + - "8081:8081" + environment: + SCHEMA_REGISTRY_HOST_NAME: kafka-schema-registry + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka1:19092,PLAINTEXT://kafka2:19093,PLAINTEXT://kafka3:19094' + SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 + networks: default: name: eventuous-network diff --git a/src/Core/src/Eventuous.Producers/AckProduce.cs b/src/Core/src/Eventuous.Producers/AckProduce.cs new file mode 100644 index 00000000..596ffadf --- /dev/null +++ b/src/Core/src/Eventuous.Producers/AckProduce.cs @@ -0,0 +1,5 @@ +namespace Eventuous.Producers; + +public delegate ValueTask AcknowledgeProduce(ProducedMessage message); + +public delegate ValueTask ReportFailedProduce(ProducedMessage message, string error, Exception? exception); diff --git a/src/Core/src/Eventuous.Producers/BaseProducer.cs b/src/Core/src/Eventuous.Producers/BaseProducer.cs index 398f2c45..2ec77f72 100644 --- a/src/Core/src/Eventuous.Producers/BaseProducer.cs +++ b/src/Core/src/Eventuous.Producers/BaseProducer.cs @@ -87,4 +87,4 @@ public async Task Produce( public bool Ready { get; private set; } protected void ReadyNow() => Ready = true; -} \ No newline at end of file +} diff --git a/src/Core/src/Eventuous.Producers/Diagnostics/ProducerActivity.cs b/src/Core/src/Eventuous.Producers/Diagnostics/ProducerActivity.cs index ad9de5f2..95ceec4a 100644 --- a/src/Core/src/Eventuous.Producers/Diagnostics/ProducerActivity.cs +++ b/src/Core/src/Eventuous.Producers/Diagnostics/ProducerActivity.cs @@ -46,11 +46,9 @@ public static (Activity? act, IEnumerable msgs) Start( return (activity, messages.Select(GetMessage)); ProducedMessage GetMessage(ProducedMessage message) - => new( - message.Message, - GetMeta(message.Metadata).AddActivityTags(activity), - message.MessageId - ); + => message with { + Metadata = GetMeta(message.Metadata).AddActivityTags(activity), + }; } static Activity? GetActivity(IEnumerable>? tags) diff --git a/src/Core/src/Eventuous.Producers/ProducedMessage.cs b/src/Core/src/Eventuous.Producers/ProducedMessage.cs index 1df746c5..6c51fcdd 100644 --- a/src/Core/src/Eventuous.Producers/ProducedMessage.cs +++ b/src/Core/src/Eventuous.Producers/ProducedMessage.cs @@ -8,8 +8,10 @@ public ProducedMessage(object message, Metadata? metadata, Guid? messageId = nul MessageType = TypeMap.GetTypeName(message); } - public object Message { get; } - public Metadata? Metadata { get; init; } - public Guid MessageId { get; } - public string MessageType { get; } -} \ No newline at end of file + public object Message { get; } + public Metadata? Metadata { get; init; } + public Guid MessageId { get; } + public string MessageType { get; } + public AcknowledgeProduce? OnAck { get; init; } + public ReportFailedProduce? OnNack { get; init; } +} diff --git a/src/Core/src/Eventuous.Producers/ProducerExtensions.cs b/src/Core/src/Eventuous.Producers/ProducerExtensions.cs index 0b97a3b5..36994d41 100644 --- a/src/Core/src/Eventuous.Producers/ProducerExtensions.cs +++ b/src/Core/src/Eventuous.Producers/ProducerExtensions.cs @@ -10,6 +10,7 @@ public static class ProducerExtensions { /// Stream name where the message should be produced /// Message to produce /// + /// Function to confirm that the message was produced /// /// Message typ /// @@ -17,13 +18,14 @@ public static Task Produce( this IEventProducer producer, StreamName stream, TMessage message, - Metadata? metadata = null, + Metadata? metadata, + AcknowledgeProduce? onAck = null, CancellationToken cancellationToken = default ) where TMessage : class { var producedMessages = message is IEnumerable collection - ? ConvertMany(collection, metadata) - : ConvertOne(message, metadata); + ? ConvertMany(collection, metadata, onAck) + : ConvertOne(message, metadata, onAck); return producer.Produce(stream, producedMessages, cancellationToken); } @@ -37,6 +39,7 @@ message is IEnumerable collection /// Message to produce /// Message metadata /// Produce options + /// Function to confirm that the message was produced /// /// Message type /// @@ -45,22 +48,26 @@ public static Task Produce( this IEventProducer producer, StreamName stream, TMessage message, - Metadata? metadata = null, - TProduceOptions? options = null, + Metadata? metadata, + TProduceOptions options, + AcknowledgeProduce? onAck = null, CancellationToken cancellationToken = default - ) - where TMessage : class where TProduceOptions : class { + ) where TMessage : class where TProduceOptions : class { var producedMessages = Ensure.NotNull(message) is IEnumerable collection - ? ConvertMany(collection, metadata) - : ConvertOne(message, metadata); + ? ConvertMany(collection, metadata, onAck) + : ConvertOne(message, metadata, onAck); return producer.Produce(stream, producedMessages, options, cancellationToken); } - static IEnumerable ConvertMany(IEnumerable messages, Metadata? metadata) - => messages.Select(x => new ProducedMessage(x, metadata)); + static IEnumerable ConvertMany( + IEnumerable messages, + Metadata? metadata, + AcknowledgeProduce? onAck + ) + => messages.Select(x => new ProducedMessage(x, metadata) { OnAck = onAck }); - static IEnumerable ConvertOne(object message, Metadata? metadata) - => new[] { new ProducedMessage(message, metadata) }; -} \ No newline at end of file + static IEnumerable ConvertOne(object message, Metadata? metadata, AcknowledgeProduce? onAck) + => new[] { new ProducedMessage(message, metadata) { OnAck = onAck } }; +} diff --git a/src/Core/src/Eventuous.Subscriptions/Context/ContextItems.cs b/src/Core/src/Eventuous.Subscriptions/Context/ContextItems.cs index 6c7006c3..684a58f0 100644 --- a/src/Core/src/Eventuous.Subscriptions/Context/ContextItems.cs +++ b/src/Core/src/Eventuous.Subscriptions/Context/ContextItems.cs @@ -36,5 +36,4 @@ public ContextItems AddItem(string key, object? value) { public static class ContextKeys { public const string GlobalPosition = nameof(GlobalPosition); public const string StreamPosition = nameof(StreamPosition); - public const string PartitionId = nameof(PartitionId); } \ No newline at end of file diff --git a/src/Core/src/Eventuous.Subscriptions/Context/DelayedAckConsumeContext.cs b/src/Core/src/Eventuous.Subscriptions/Context/DelayedAckConsumeContext.cs index 9e07592b..fe1475c1 100644 --- a/src/Core/src/Eventuous.Subscriptions/Context/DelayedAckConsumeContext.cs +++ b/src/Core/src/Eventuous.Subscriptions/Context/DelayedAckConsumeContext.cs @@ -40,4 +40,7 @@ public DelayedAckConsumeContext(IMessageConsumeContext inner, Acknowledge acknow /// Exception that occurred during message processing /// public ValueTask Fail(Exception exception) => _fail(this, exception); -} \ No newline at end of file + + public string? PartitionKey { get; internal set; } + public long PartitionId { get; internal set; } +} diff --git a/src/Core/src/Eventuous.Subscriptions/Filters/ConcurrentFilter.cs b/src/Core/src/Eventuous.Subscriptions/Filters/ConcurrentFilter.cs index 1af7b092..9456a69d 100644 --- a/src/Core/src/Eventuous.Subscriptions/Filters/ConcurrentFilter.cs +++ b/src/Core/src/Eventuous.Subscriptions/Filters/ConcurrentFilter.cs @@ -39,7 +39,8 @@ static async ValueTask DelayedConsume(WorkerTask workerTask, CancellationToken c } } - await ctx.Acknowledge().NoContext(); + if (!ctx.HandlingResults.IsPending()) + await ctx.Acknowledge().NoContext(); } catch (TaskCanceledException) { ctx.Ignore(); diff --git a/src/Core/src/Eventuous.Subscriptions/Filters/Partitioning/Partitioner.cs b/src/Core/src/Eventuous.Subscriptions/Filters/Partitioning/Partitioner.cs index fd8718eb..26774983 100644 --- a/src/Core/src/Eventuous.Subscriptions/Filters/Partitioning/Partitioner.cs +++ b/src/Core/src/Eventuous.Subscriptions/Filters/Partitioning/Partitioner.cs @@ -3,7 +3,7 @@ namespace Eventuous.Subscriptions.Filters.Partitioning; public static class Partitioner { - public delegate uint GetPartitionHash(IMessageConsumeContext context); + public delegate uint GetPartitionHash(string partitionKey); public delegate string GetPartitionKey(IMessageConsumeContext context); } \ No newline at end of file diff --git a/src/Core/src/Eventuous.Subscriptions/Filters/PartitioningFilter.cs b/src/Core/src/Eventuous.Subscriptions/Filters/PartitioningFilter.cs index 491ca804..4369eb8b 100644 --- a/src/Core/src/Eventuous.Subscriptions/Filters/PartitioningFilter.cs +++ b/src/Core/src/Eventuous.Subscriptions/Filters/PartitioningFilter.cs @@ -5,27 +5,33 @@ namespace Eventuous.Subscriptions.Filters; public sealed class PartitioningFilter : ConsumeFilter, IAsyncDisposable { - readonly int _partitionCount; - readonly Partitioner.GetPartitionHash _partitioner; + readonly Partitioner.GetPartitionHash _getHash; + readonly Partitioner.GetPartitionKey _partitioner; readonly ConcurrentFilter[] _filters; + readonly int _partitionCount; - public PartitioningFilter(uint partitionCount, Partitioner.GetPartitionHash? partitioner = null) { + public PartitioningFilter( + uint partitionCount, + Partitioner.GetPartitionKey? partitioner = null, + Partitioner.GetPartitionHash? getHash = null + ) { + _getHash = getHash ?? MurmurHash3.Hash; _partitionCount = (int)partitionCount; - _partitioner = partitioner ?? (ctx => MurmurHash3.Hash(ctx.Stream)); + _partitioner = partitioner ?? (ctx => ctx.Stream); _filters = Enumerable.Range(0, _partitionCount).Select(_ => new ConcurrentFilter(1)).ToArray(); } - public PartitioningFilter(uint partitionCount, Partitioner.GetPartitionKey getPartitionKey) - : this(partitionCount, ctx => MurmurHash3.Hash(getPartitionKey(ctx))) { } - public override ValueTask Send(DelayedAckConsumeContext context, Func? next) { - var hash = _partitioner(context); - var partition = hash % _partitionCount; - return _filters[partition].Send(context.WithItem(ContextKeys.PartitionId, partition), next); + var partitionKey = _partitioner(context); + var hash = _getHash(partitionKey); + var partition = hash % _partitionCount; + context.PartitionKey = partitionKey; + context.PartitionId = partition; + return _filters[partition].Send(context, next); } public async ValueTask DisposeAsync() { Log.Stopping(nameof(PartitioningFilter), "concurrent filters", ""); await Task.WhenAll(_filters.Select(async x => await x.DisposeAsync())); } -} \ No newline at end of file +} diff --git a/src/Core/src/Eventuous.Subscriptions/Filters/TracingFilter.cs b/src/Core/src/Eventuous.Subscriptions/Filters/TracingFilter.cs index c65f71c6..e6a5bdeb 100644 --- a/src/Core/src/Eventuous.Subscriptions/Filters/TracingFilter.cs +++ b/src/Core/src/Eventuous.Subscriptions/Filters/TracingFilter.cs @@ -31,9 +31,8 @@ public override async ValueTask Send( ) : Activity.Current; - if (activity?.IsAllDataRequested == true) { - var partitionId = context.Items.TryGetItem(ContextKeys.PartitionId); - activity.SetContextTags(context)?.SetTag(TelemetryTags.Eventuous.Partition, partitionId); + if (activity?.IsAllDataRequested == true && context is DelayedAckConsumeContext delayedAckContext) { + activity.SetContextTags(context)?.SetTag(TelemetryTags.Eventuous.Partition, delayedAckContext.PartitionId); } try { diff --git a/src/Core/src/Eventuous.Subscriptions/Handlers/EventHandlingResult.cs b/src/Core/src/Eventuous.Subscriptions/Handlers/EventHandlingResult.cs index 7f769c4e..d7d2e7ad 100644 --- a/src/Core/src/Eventuous.Subscriptions/Handlers/EventHandlingResult.cs +++ b/src/Core/src/Eventuous.Subscriptions/Handlers/EventHandlingResult.cs @@ -43,6 +43,8 @@ public IEnumerable GetResultsOf(EventHandlingStatus status) public EventHandlingStatus GetFailureStatus() => _handlingStatus & EventHandlingStatus.Handled; public EventHandlingStatus GetIgnoreStatus() => _handlingStatus & EventHandlingStatus.Ignored; + + public bool IsPending() => _handlingStatus == 0; public Exception? GetException() => _results.First(x => x.Exception != null).Exception; } \ No newline at end of file diff --git a/src/Core/src/Eventuous.Subscriptions/Handlers/EventHandlingStatus.cs b/src/Core/src/Eventuous.Subscriptions/Handlers/EventHandlingStatus.cs index 8bcc9129..eb2f9186 100644 --- a/src/Core/src/Eventuous.Subscriptions/Handlers/EventHandlingStatus.cs +++ b/src/Core/src/Eventuous.Subscriptions/Handlers/EventHandlingStatus.cs @@ -4,6 +4,7 @@ namespace Eventuous.Subscriptions; public enum EventHandlingStatus : short { Ignored = 0b_1000, Success = 0b_0001, + Pending = 0b_0010, Failure = 0b_0011, Handled = 0b_0111, // 0111 bitmask for Handled means that if any of the three lower bits is set, the message diff --git a/src/Core/src/Eventuous/Meta/MetaTags.cs b/src/Core/src/Eventuous/Meta/MetaTags.cs index f27a35af..02be5c16 100644 --- a/src/Core/src/Eventuous/Meta/MetaTags.cs +++ b/src/Core/src/Eventuous/Meta/MetaTags.cs @@ -1,4 +1,4 @@ -namespace Eventuous; +namespace Eventuous; public static class MetaTags { const string Prefix = "eventuous"; @@ -6,4 +6,4 @@ public static class MetaTags { public const string MessageId = $"{Prefix}.message-id"; public const string CorrelationId = $"{Prefix}.correlation-id"; public const string CausationId = $"{Prefix}.causation-id"; -} \ No newline at end of file +} diff --git a/src/Core/src/Eventuous/Meta/Metadata.cs b/src/Core/src/Eventuous/Meta/Metadata.cs index f39b0e8b..1b00418b 100644 --- a/src/Core/src/Eventuous/Meta/Metadata.cs +++ b/src/Core/src/Eventuous/Meta/Metadata.cs @@ -18,7 +18,7 @@ public Metadata With(string key, T? value) { return this; } - public string? GetString(string key) => TryGetValue(key, out var value) ? value.ToString() : default; + public string? GetString(string key) => TryGetValue(key, out var value) ? value?.ToString() : default; public T? Get(string key) => TryGetValue(key, out var value) && value is T v ? v : default; diff --git a/src/Core/src/Eventuous/Tools/AsyncHelper.cs b/src/Core/src/Eventuous/Tools/AsyncHelper.cs new file mode 100644 index 00000000..a20a1b33 --- /dev/null +++ b/src/Core/src/Eventuous/Tools/AsyncHelper.cs @@ -0,0 +1,24 @@ +namespace Eventuous; + +public static class AsyncHelper { + static readonly TaskFactory MyTaskFactory = new( + CancellationToken.None, + TaskCreationOptions.None, + TaskContinuationOptions.None, + TaskScheduler.Default + ); + + public static TResult RunSync(Func> func) + => MyTaskFactory + .StartNew(func) + .Unwrap() + .GetAwaiter() + .GetResult(); + + public static void RunSync(Func func) + => MyTaskFactory + .StartNew(func) + .Unwrap() + .GetAwaiter() + .GetResult(); +} diff --git a/src/Core/src/Eventuous/Tools/TaskExtensions.cs b/src/Core/src/Eventuous/Tools/TaskExtensions.cs index cd4bbf4e..d4b3b664 100644 --- a/src/Core/src/Eventuous/Tools/TaskExtensions.cs +++ b/src/Core/src/Eventuous/Tools/TaskExtensions.cs @@ -5,14 +5,11 @@ namespace Eventuous; public static class TaskExtensions { public static ConfiguredTaskAwaitable NoContext(this Task task) => task.ConfigureAwait(false); - public static ConfiguredTaskAwaitable NoContext(this Task task) - => task.ConfigureAwait(false); + public static ConfiguredTaskAwaitable NoContext(this Task task) => task.ConfigureAwait(false); - public static ConfiguredValueTaskAwaitable NoContext(this ValueTask task) - => task.ConfigureAwait(false); + public static ConfiguredValueTaskAwaitable NoContext(this ValueTask task) => task.ConfigureAwait(false); - public static ConfiguredValueTaskAwaitable NoContext(this ValueTask task) - => task.ConfigureAwait(false); + public static ConfiguredValueTaskAwaitable NoContext(this ValueTask task) => task.ConfigureAwait(false); public static ConfiguredCancelableAsyncEnumerable IgnoreWithCancellation( this IAsyncEnumerable source, @@ -20,27 +17,24 @@ CancellationToken cancellationToken ) => source.WithCancellation(cancellationToken).ConfigureAwait(false); + public static Task WhenAll(this IEnumerable tasks) => Task.WhenAll(tasks); + public static async Task WhenAll(this IEnumerable tasks) { var toAwait = tasks .Where(valueTask => !valueTask.IsCompletedSuccessfully) .Select(valueTask => valueTask.AsTask()) .ToList(); - if (toAwait.Count > 0) - await Task.WhenAll(toAwait).NoContext(); + if (toAwait.Count > 0) await Task.WhenAll(toAwait).NoContext(); } - public static async Task> WhenAll( - this IEnumerable> tasks - ) { + public static async Task> WhenAll(this IEnumerable> tasks) { var results = new List(); var toAwait = new List>(); foreach (var valueTask in tasks) { - if (valueTask.IsCompletedSuccessfully) - results.Add(valueTask.Result); - else - toAwait.Add(valueTask.AsTask()); + if (valueTask.IsCompletedSuccessfully) results.Add(valueTask.Result); + else toAwait.Add(valueTask.AsTask()); } if (toAwait.Count == 0) return results; @@ -49,4 +43,4 @@ this IEnumerable> tasks return results; } -} \ No newline at end of file +} diff --git a/src/Core/test/Eventuous.Tests.Subscriptions/Eventuous.Tests.Subscriptions.csproj b/src/Core/test/Eventuous.Tests.Subscriptions/Eventuous.Tests.Subscriptions.csproj index 5c846615..b2ae1a9b 100644 --- a/src/Core/test/Eventuous.Tests.Subscriptions/Eventuous.Tests.Subscriptions.csproj +++ b/src/Core/test/Eventuous.Tests.Subscriptions/Eventuous.Tests.Subscriptions.csproj @@ -1,5 +1,6 @@ + net6.0 true true true diff --git a/src/Core/test/Eventuous.Tests.Subscriptions/HandlingStatusTests.cs b/src/Core/test/Eventuous.Tests.Subscriptions/HandlingStatusTests.cs index 86b7b9e2..7c9a93e7 100644 --- a/src/Core/test/Eventuous.Tests.Subscriptions/HandlingStatusTests.cs +++ b/src/Core/test/Eventuous.Tests.Subscriptions/HandlingStatusTests.cs @@ -24,6 +24,13 @@ public void NackAndIgnoreShouldNack() { (actual & EventHandlingStatus.Handled).Should().Be(EventHandlingStatus.Failure); } + [Fact] + public void PendingShouldBeHandled() { + const EventHandlingStatus actual = EventHandlingStatus.Pending; + (actual & EventHandlingStatus.Handled).Should().NotBe(EventHandlingStatus.Failure); + (actual & EventHandlingStatus.Handled).Should().NotBe(EventHandlingStatus.Ignored); + } + [Fact] public void IgnoredShouldBeIgnored() { const EventHandlingStatus actual = EventHandlingStatus.Ignored; @@ -37,6 +44,7 @@ public void NackAndIgnoreShouldFail() { context.Ignore("test"); context.HasFailed().Should().BeTrue(); context.WasIgnored().Should().BeFalse(); + context.HandlingResults.IsPending().Should().BeFalse(); } [Fact] @@ -47,6 +55,7 @@ public void NackAckAndIgnoreShouldFail() { context.Ignore(); context.HasFailed().Should().BeTrue(); context.WasIgnored().Should().BeFalse(); + context.HandlingResults.IsPending().Should().BeFalse(); } [Fact] @@ -56,6 +65,7 @@ public void AckAndIgnoreShouldSucceed() { context.Ignore(); context.HasFailed().Should().BeFalse(); context.WasIgnored().Should().BeFalse(); + context.HandlingResults.IsPending().Should().BeFalse(); } [Fact] @@ -64,5 +74,14 @@ public void IgnoreAndIgnoreShouldIgnore() { context.Ignore(); context.Ignore(); context.WasIgnored().Should().BeTrue(); + context.HandlingResults.IsPending().Should().BeFalse(); + } + + [Fact] + public void PendingShouldBePending() { + var context = Fixture.Create(); + context.WasIgnored().Should().BeFalse(); + context.HasFailed().Should().BeFalse(); + context.HandlingResults.IsPending().Should().BeTrue(); } } \ No newline at end of file diff --git a/src/Diagnostics/docker-compose.yml b/src/Diagnostics/docker-compose.yml new file mode 100644 index 00000000..de81180c --- /dev/null +++ b/src/Diagnostics/docker-compose.yml @@ -0,0 +1,8 @@ +version: '3.7' + +services: + zipkin: + image: openzipkin/zipkin + container_name: eventuous-zipkin + ports: + - "9411:9411" diff --git a/src/Diagnostics/test/Eventuous.Tests.OpenTelemetry/MetricsTests.cs b/src/Diagnostics/test/Eventuous.Tests.OpenTelemetry/MetricsTests.cs index 3b138bf7..7154f271 100644 --- a/src/Diagnostics/test/Eventuous.Tests.OpenTelemetry/MetricsTests.cs +++ b/src/Diagnostics/test/Eventuous.Tests.OpenTelemetry/MetricsTests.cs @@ -89,7 +89,7 @@ static object GetTag(MetricValue metric, string key) { public async Task InitializeAsync() { var testEvents = IntegrationFixture.Instance.Auto.CreateMany(Count).ToList(); var producer = _host.Services.GetRequiredService(); - await producer.Produce(_stream, testEvents); + await producer.Produce(_stream, testEvents, new Metadata()); await Task.Delay(1000); } diff --git a/src/Elastic/docker-compose.yml b/src/Elastic/docker-compose.yml new file mode 100644 index 00000000..37cafffb --- /dev/null +++ b/src/Elastic/docker-compose.yml @@ -0,0 +1,129 @@ +version: '3.7' + +services: + es01: + image: docker.elastic.co/elasticsearch/elasticsearch:7.15.0 + container_name: eventuous-es01 + environment: + - node.name=es01 + - cluster.name=es-docker-cluster + - discovery.seed_hosts=es02,es03,es04 + - cluster.initial_master_nodes=es01,es02,es03,es04 + - node.roles=master,data_content,data_hot + - path.repo=/usr/share/elasticsearch/snapshots + - xpack.security.enabled=false + #- bootstrap.memory_lock=true + - "ES_JAVA_OPTS=-Xms512m -Xmx512m" + ulimits: + memlock: + soft: -1 + hard: -1 + mem_limit: 1Gb + volumes: + - hot_data:/usr/share/elasticsearch/data + - snapshots:/usr/share/elasticsearch/snapshots + ports: + - "9200:9200" + networks: + - elastic + + es02: + image: docker.elastic.co/elasticsearch/elasticsearch:7.15.0 + container_name: eventuous-es02 + environment: + - node.name=es02 + - cluster.name=es-docker-cluster + - discovery.seed_hosts=es01,es03,es04 + - cluster.initial_master_nodes=es01,es02,es03,es04 + - node.roles=master,data_warm + - path.repo=/usr/share/elasticsearch/snapshots + - xpack.security.enabled=false + #- bootstrap.memory_lock=true + - "ES_JAVA_OPTS=-Xms512m -Xmx512m" + ulimits: + memlock: + soft: -1 + hard: -1 + mem_limit: 1Gb + volumes: + - warm_data:/usr/share/elasticsearch/data + - snapshots:/usr/share/elasticsearch/snapshots + networks: + - elastic + + es03: + image: docker.elastic.co/elasticsearch/elasticsearch:7.15.0 + container_name: eventuous-es03 + environment: + - node.name=es03 + - cluster.name=es-docker-cluster + - discovery.seed_hosts=es01,es02,es04 + - cluster.initial_master_nodes=es01,es02,es03,es04 + - node.roles=master,data_cold + - path.repo=/usr/share/elasticsearch/snapshots + - xpack.security.enabled=false + #- bootstrap.memory_lock=true + - "ES_JAVA_OPTS=-Xms512m -Xmx512m" + ulimits: + memlock: + soft: -1 + hard: -1 + mem_limit: 1Gb + volumes: + - cold_data:/usr/share/elasticsearch/data + - snapshots:/usr/share/elasticsearch/snapshots + networks: + - elastic + + es04: + image: docker.elastic.co/elasticsearch/elasticsearch:7.15.0 + container_name: eventuous-es04 + environment: + - node.name=es04 + - cluster.name=es-docker-cluster + - discovery.seed_hosts=es01,es02,es03 + - cluster.initial_master_nodes=es01,es02,es03,es04 + - node.roles=master,data_frozen + - path.repo=/usr/share/elasticsearch/snapshots + - xpack.searchable.snapshot.shared_cache.size=10% + - xpack.security.enabled=false + #- bootstrap.memory_lock=true + - "ES_JAVA_OPTS=-Xms512m -Xmx512m" + ulimits: + memlock: + soft: -1 + hard: -1 + mem_limit: 1Gb + volumes: + - frozen_data:/usr/share/elasticsearch/data + - snapshots:/usr/share/elasticsearch/snapshots + networks: + - elastic + + kib01: + image: docker.elastic.co/kibana/kibana:7.15.0 + container_name: eventuous-kibana + ports: + - "5601:5601" + environment: + ELASTICSEARCH_URL: http://es01:9200 + ELASTICSEARCH_HOSTS: '["http://es01:9200","http://es02:9200","http://es03:9200","http://es04:9200"]' + mem_limit: 1Gb + networks: + - elastic + +volumes: + hot_data: + driver: local + warm_data: + driver: local + cold_data: + driver: local + frozen_data: + driver: local + snapshots: + driver: local + +networks: + elastic: + driver: bridge \ No newline at end of file diff --git a/src/EventStore/docker-compose.yml b/src/EventStore/docker-compose.yml new file mode 100644 index 00000000..34f79b8b --- /dev/null +++ b/src/EventStore/docker-compose.yml @@ -0,0 +1,18 @@ +version: '3.7' + +services: + esdb: + container_name: eventuous-esdb + image: ghcr.io/eventstore/eventstore:21.10.0-alpha-arm64v8 + ports: + - '2113:2113' + - '1113:1113' + environment: + EVENTSTORE_INSECURE: 'true' + EVENTSTORE_CLUSTER_SIZE: 1 + EVENTSTORE_EXT_TCP_PORT: 1113 + EVENTSTORE_HTTP_PORT: 2113 + EVENTSTORE_ENABLE_EXTERNAL_TCP: 'true' + EVENTSTORE_RUN_PROJECTIONS: all + EVENTSTORE_START_STANDARD_PROJECTIONS: "true" + EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP: "true" diff --git a/src/EventStore/src/Eventuous.EventStore/Subscriptions/AllStreamSubscription.cs b/src/EventStore/src/Eventuous.EventStore/Subscriptions/AllStreamSubscription.cs index cb7f36bd..6c261727 100644 --- a/src/EventStore/src/Eventuous.EventStore/Subscriptions/AllStreamSubscription.cs +++ b/src/EventStore/src/Eventuous.EventStore/Subscriptions/AllStreamSubscription.cs @@ -92,9 +92,8 @@ void HandleDrop( global::EventStore.Client.StreamSubscription _, SubscriptionDroppedReason reason, Exception? ex - ) { - Dropped(EsdbMappings.AsDropReason(reason), ex); - } + ) + => Dropped(EsdbMappings.AsDropReason(reason), ex); } IMessageConsumeContext CreateContext(ResolvedEvent re, CancellationToken cancellationToken) { diff --git a/src/EventStore/test/Eventuous.Tests.EventStore/PersistentPublishAndSubscribeManyTests.cs b/src/EventStore/test/Eventuous.Tests.EventStore/PersistentPublishAndSubscribeManyTests.cs index e1d971f2..907402b4 100644 --- a/src/EventStore/test/Eventuous.Tests.EventStore/PersistentPublishAndSubscribeManyTests.cs +++ b/src/EventStore/test/Eventuous.Tests.EventStore/PersistentPublishAndSubscribeManyTests.cs @@ -17,7 +17,7 @@ public async Task SubscribeAndProduceMany() { await Start(); - await Producer.Produce(Stream, testEvents); + await Producer.Produce(Stream, testEvents, new Metadata()); await Handler.Validate(10.Seconds()); diff --git a/src/EventStore/test/Eventuous.Tests.EventStore/ProducerTracesTests.cs b/src/EventStore/test/Eventuous.Tests.EventStore/ProducerTracesTests.cs index f211983c..1c29c222 100644 --- a/src/EventStore/test/Eventuous.Tests.EventStore/ProducerTracesTests.cs +++ b/src/EventStore/test/Eventuous.Tests.EventStore/ProducerTracesTests.cs @@ -30,7 +30,7 @@ public TracesTests(ITestOutputHelper outputHelper) public async Task ShouldPropagateRemoveContext() { var testEvent = Auto.Create(); - await Producer.Produce(Stream, testEvent); + await Producer.Produce(Stream, testEvent, new Metadata()); await Start(); diff --git a/src/EventStore/test/Eventuous.Tests.EventStore/PublishAndSubscribeManyPartitionedTests.cs b/src/EventStore/test/Eventuous.Tests.EventStore/PublishAndSubscribeManyPartitionedTests.cs index ae5ceb78..417bcc52 100644 --- a/src/EventStore/test/Eventuous.Tests.EventStore/PublishAndSubscribeManyPartitionedTests.cs +++ b/src/EventStore/test/Eventuous.Tests.EventStore/PublishAndSubscribeManyPartitionedTests.cs @@ -25,7 +25,7 @@ public async Task SubscribeAndProduceMany() { Handler.AssertThat().Exactly(count, x => testEvents.Contains(x)); await Start(); - await Producer.Produce(Stream, testEvents); + await Producer.Produce(Stream, testEvents, new Metadata()); await Handler.Validate(5.Seconds()); await Stop(); diff --git a/src/EventStore/test/Eventuous.Tests.EventStore/PublishAndSubscribeManyTests.cs b/src/EventStore/test/Eventuous.Tests.EventStore/PublishAndSubscribeManyTests.cs index 54502e97..bb59db11 100644 --- a/src/EventStore/test/Eventuous.Tests.EventStore/PublishAndSubscribeManyTests.cs +++ b/src/EventStore/test/Eventuous.Tests.EventStore/PublishAndSubscribeManyTests.cs @@ -17,7 +17,7 @@ public async Task SubscribeAndProduceMany() { await Start(); - await Producer.Produce(Stream, testEvents); + await Producer.Produce(Stream, testEvents, new Metadata()); await Handler.Validate(10.Seconds()); diff --git a/src/EventStore/test/Eventuous.Tests.EventStore/PublishAndSubscribeOneTests.cs b/src/EventStore/test/Eventuous.Tests.EventStore/PublishAndSubscribeOneTests.cs index 8167a10e..3634fcbe 100644 --- a/src/EventStore/test/Eventuous.Tests.EventStore/PublishAndSubscribeOneTests.cs +++ b/src/EventStore/test/Eventuous.Tests.EventStore/PublishAndSubscribeOneTests.cs @@ -13,7 +13,7 @@ public async Task SubscribeAndProduce() { var testEvent = Auto.Create(); Handler.AssertThat().Any(x => x as TestEvent == testEvent); - await Producer.Produce(Stream, testEvent); + await Producer.Produce(Stream, testEvent, new Metadata()); await Start(); await Handler.Validate(10.Seconds()); diff --git a/src/Extensions/src/Eventuous.AspNetCore.Web/RouteBuilderExtensions.cs b/src/Extensions/src/Eventuous.AspNetCore.Web/RouteBuilderExtensions.cs index 37dd3146..2a790d08 100644 --- a/src/Extensions/src/Eventuous.AspNetCore.Web/RouteBuilderExtensions.cs +++ b/src/Extensions/src/Eventuous.AspNetCore.Web/RouteBuilderExtensions.cs @@ -100,7 +100,7 @@ void MapAssemblyCommands(Assembly assembly) { ); var genericMethod = method.MakeGenericMethod(typeof(TAggregate), type); - genericMethod.Invoke(null, new object?[] { builder, attr.Route }); + genericMethod.Invoke(null, new object?[] { builder, attr.Route, null }); } } diff --git a/src/Extensions/test/Eventuous.Tests.AspNetCore.Web/HttpCommandTests.cs b/src/Extensions/test/Eventuous.Tests.AspNetCore.Web/HttpCommandTests.cs index 83cf3311..bef8302f 100644 --- a/src/Extensions/test/Eventuous.Tests.AspNetCore.Web/HttpCommandTests.cs +++ b/src/Extensions/test/Eventuous.Tests.AspNetCore.Web/HttpCommandTests.cs @@ -1,5 +1,6 @@ using System.Net; using System.Text.Json; +using Eventuous.AspNetCore.Web; using Eventuous.Sut.AspNetCore; using Eventuous.Sut.Domain; using Eventuous.TestHelpers; @@ -83,3 +84,6 @@ public async Task MapEnrichedCommand() { } record BookRoom(string BookingId, string RoomId, LocalDate CheckIn, LocalDate CheckOut, decimal Price); + +[HttpCommand(Route = "book")] +record BookAnotherRoom(string BookingId, string RoomId, LocalDate CheckIn, LocalDate CheckOut, decimal Price); diff --git a/src/Experimental/src/Eventuous.Connectors.Base/Configuration.cs b/src/Gateway/src/Eventuous.Connectors.Base/Configuration.cs similarity index 100% rename from src/Experimental/src/Eventuous.Connectors.Base/Configuration.cs rename to src/Gateway/src/Eventuous.Connectors.Base/Configuration.cs diff --git a/src/Experimental/src/Eventuous.Connectors.Base/ConnectorApplication.cs b/src/Gateway/src/Eventuous.Connectors.Base/ConnectorApplication.cs similarity index 100% rename from src/Experimental/src/Eventuous.Connectors.Base/ConnectorApplication.cs rename to src/Gateway/src/Eventuous.Connectors.Base/ConnectorApplication.cs diff --git a/src/Experimental/src/Eventuous.Connectors.Base/ConnectorBuilder.cs b/src/Gateway/src/Eventuous.Connectors.Base/ConnectorBuilder.cs similarity index 87% rename from src/Experimental/src/Eventuous.Connectors.Base/ConnectorBuilder.cs rename to src/Gateway/src/Eventuous.Connectors.Base/ConnectorBuilder.cs index 182b7d40..f8886ba9 100644 --- a/src/Experimental/src/Eventuous.Connectors.Base/ConnectorBuilder.cs +++ b/src/Gateway/src/Eventuous.Connectors.Base/ConnectorBuilder.cs @@ -1,3 +1,4 @@ +using System.Diagnostics.CodeAnalysis; using Eventuous.Gateway; using Eventuous.Producers; using Eventuous.Subscriptions; @@ -9,6 +10,8 @@ namespace Microsoft.Extensions.DependencyInjection; public class ConnectorBuilder { + [PublicAPI] + [SuppressMessage("Performance", "CA1822:Mark members as static")] public ConnectorBuilder SubscribeWith( string subscriptionId ) where TSubscription : EventSubscription where TSubscriptionOptions : SubscriptionOptions @@ -21,6 +24,7 @@ public class ConnectorBuilder : ConnectorBu internal ConnectorBuilder(string subscriptionId) => SubscriptionId = subscriptionId; + [PublicAPI] public ConnectorBuilder ConfigureSubscriptionOptions( Action configureOptions ) { @@ -28,6 +32,7 @@ Action configureOptions return this; } + [PublicAPI] public ConnectorBuilder ConfigureSubscription( Action> configure ) { @@ -35,10 +40,11 @@ Action> configure return this; } + [PublicAPI] public ConnectorBuilder - ProduceWith() + ProduceWith(bool awaitProduce = false) where TProducer : class, IEventProducer where TProduceOptions : class - => new(this); + => new(this, awaitProduce); internal void ConfigureOptions(TSubscriptionOptions options) => _configureOptions?.Invoke(options); @@ -56,10 +62,15 @@ public class ConnectorBuilder _inner; Func>? _getTransformer; + readonly bool _awaitProduce; Type? _transformerType; - public ConnectorBuilder(ConnectorBuilder inner) => _inner = inner; + public ConnectorBuilder(ConnectorBuilder inner, bool awaitProduce) { + _inner = inner; + _awaitProduce = awaitProduce; + } + [PublicAPI] public ConnectorBuilder TransformWith( Func getTransformer ) where T : class, IGatewayTransform { @@ -91,7 +102,8 @@ IEventHandler GetHandler(IServiceProvider sp) { return new GatewayHandler( new GatewayProducer(producer), - transform!.RouteAndTransform + transform!.RouteAndTransform, + _awaitProduce ); } } diff --git a/src/Experimental/src/Eventuous.Connectors.Base/ConnectorConfig.cs b/src/Gateway/src/Eventuous.Connectors.Base/ConnectorConfig.cs similarity index 100% rename from src/Experimental/src/Eventuous.Connectors.Base/ConnectorConfig.cs rename to src/Gateway/src/Eventuous.Connectors.Base/ConnectorConfig.cs diff --git a/src/Experimental/src/Eventuous.Connectors.Base/ConnectorRegistration.cs b/src/Gateway/src/Eventuous.Connectors.Base/ConnectorRegistration.cs similarity index 100% rename from src/Experimental/src/Eventuous.Connectors.Base/ConnectorRegistration.cs rename to src/Gateway/src/Eventuous.Connectors.Base/ConnectorRegistration.cs diff --git a/src/Experimental/src/Eventuous.Connectors.Base/Eventuous.Connectors.Base.csproj b/src/Gateway/src/Eventuous.Connectors.Base/Eventuous.Connectors.Base.csproj similarity index 91% rename from src/Experimental/src/Eventuous.Connectors.Base/Eventuous.Connectors.Base.csproj rename to src/Gateway/src/Eventuous.Connectors.Base/Eventuous.Connectors.Base.csproj index b0e3cdb6..d0ebfeda 100644 --- a/src/Experimental/src/Eventuous.Connectors.Base/Eventuous.Connectors.Base.csproj +++ b/src/Gateway/src/Eventuous.Connectors.Base/Eventuous.Connectors.Base.csproj @@ -18,7 +18,7 @@ - + diff --git a/src/Experimental/src/Eventuous.Connectors.Base/ExporterMappings.cs b/src/Gateway/src/Eventuous.Connectors.Base/ExporterMappings.cs similarity index 100% rename from src/Experimental/src/Eventuous.Connectors.Base/ExporterMappings.cs rename to src/Gateway/src/Eventuous.Connectors.Base/ExporterMappings.cs diff --git a/src/Experimental/src/Eventuous.Connectors.Base/Hosting.cs b/src/Gateway/src/Eventuous.Connectors.Base/Hosting.cs similarity index 100% rename from src/Experimental/src/Eventuous.Connectors.Base/Hosting.cs rename to src/Gateway/src/Eventuous.Connectors.Base/Hosting.cs diff --git a/src/Experimental/src/Eventuous.Connectors.Base/Logging.cs b/src/Gateway/src/Eventuous.Connectors.Base/Logging.cs similarity index 100% rename from src/Experimental/src/Eventuous.Connectors.Base/Logging.cs rename to src/Gateway/src/Eventuous.Connectors.Base/Logging.cs diff --git a/src/Experimental/src/Eventuous.Connectors.Base/StartupJob.cs b/src/Gateway/src/Eventuous.Connectors.Base/StartupJob.cs similarity index 100% rename from src/Experimental/src/Eventuous.Connectors.Base/StartupJob.cs rename to src/Gateway/src/Eventuous.Connectors.Base/StartupJob.cs diff --git a/src/Experimental/src/Eventuous.Connectors.Base/VersionParser.cs b/src/Gateway/src/Eventuous.Connectors.Base/VersionParser.cs similarity index 100% rename from src/Experimental/src/Eventuous.Connectors.Base/VersionParser.cs rename to src/Gateway/src/Eventuous.Connectors.Base/VersionParser.cs diff --git a/src/Shovel/src/Eventuous.Gateway/Eventuous.Gateway.csproj b/src/Gateway/src/Eventuous.Gateway/Eventuous.Gateway.csproj similarity index 100% rename from src/Shovel/src/Eventuous.Gateway/Eventuous.Gateway.csproj rename to src/Gateway/src/Eventuous.Gateway/Eventuous.Gateway.csproj diff --git a/src/Gateway/src/Eventuous.Gateway/GatewayHandler.cs b/src/Gateway/src/Eventuous.Gateway/GatewayHandler.cs new file mode 100644 index 00000000..b37217b0 --- /dev/null +++ b/src/Gateway/src/Eventuous.Gateway/GatewayHandler.cs @@ -0,0 +1,47 @@ +using Eventuous.Subscriptions.Context; + +namespace Eventuous.Gateway; + +public delegate ValueTask RouteAndTransform(IMessageConsumeContext context); + +class GatewayHandler : BaseEventHandler { + readonly IEventProducer _eventProducer; + readonly RouteAndTransform _transform; + readonly bool _awaitProduce; + + public GatewayHandler(IEventProducer eventProducer, RouteAndTransform transform, bool awaitProduce) { + _eventProducer = eventProducer; + _transform = transform; + _awaitProduce = awaitProduce; + } + + public override async ValueTask HandleEvent(IMessageConsumeContext context) { + var shovelMessages = await _transform(context).NoContext(); + + if (shovelMessages.Length == 0) return EventHandlingStatus.Ignored; + + AcknowledgeProduce? onAck = null; + + if (context is DelayedAckConsumeContext delayed) { + onAck = _ => delayed.Acknowledge(); + } + + var grouped = shovelMessages.GroupBy(x => x.TargetStream); + + try { + await grouped.Select(x => ProduceToStream(x.Key, x)).WhenAll(); + } + catch (OperationCanceledException e) { + context.Nack(e); + } + + return _awaitProduce ? EventHandlingStatus.Success : EventHandlingStatus.Pending; + + Task ProduceToStream(StreamName streamName, IEnumerable toProduce) { + var messages = toProduce + .Select(x => new ProducedMessage(x.Message, x.GetMeta(context)) { OnAck = onAck }); + + return _eventProducer.Produce(streamName, messages, context.CancellationToken); + } + } +} diff --git a/src/Gateway/src/Eventuous.Gateway/GatewayHandlerWithOptions.cs b/src/Gateway/src/Eventuous.Gateway/GatewayHandlerWithOptions.cs new file mode 100644 index 00000000..44129b5f --- /dev/null +++ b/src/Gateway/src/Eventuous.Gateway/GatewayHandlerWithOptions.cs @@ -0,0 +1,65 @@ +using Eventuous.Subscriptions.Context; + +namespace Eventuous.Gateway; + +public delegate ValueTask[]> RouteAndTransform( + IMessageConsumeContext message +); + +public class GatewayHandler : BaseEventHandler + where TProduceOptions : class { + readonly IEventProducer _eventProducer; + + readonly RouteAndTransform _transform; + readonly bool _awaitProduce; + + public GatewayHandler( + IEventProducer eventProducer, + RouteAndTransform transform, + bool awaitProduce + ) { + _eventProducer = eventProducer; + _transform = transform; + _awaitProduce = awaitProduce; + } + + public override async ValueTask HandleEvent(IMessageConsumeContext context) { + var shovelMessages = await _transform(context).NoContext(); + + if (shovelMessages.Length == 0) return EventHandlingStatus.Ignored; + + AcknowledgeProduce? onAck = null; + + if (context is DelayedAckConsumeContext delayed) { + onAck = _ => delayed.Acknowledge(); + } + + try { + var grouped = shovelMessages.GroupBy(x => x.TargetStream); + + await grouped + .Select(x => ProduceToStream(x.Key, x)) + .WhenAll() + .NoContext(); + } + catch (OperationCanceledException e) { + context.Nack(e); + } + + return _awaitProduce ? EventHandlingStatus.Success : EventHandlingStatus.Pending; + + Task ProduceToStream(StreamName streamName, IEnumerable> toProduce) + => toProduce.Select( + x => + _eventProducer.Produce( + streamName, + x.Message, + x.GetMeta(context), + x.ProduceOptions, + onAck, + context.CancellationToken + ) + ) + .WhenAll(); + } +} diff --git a/src/Gateway/src/Eventuous.Gateway/GatewayMessage.cs b/src/Gateway/src/Eventuous.Gateway/GatewayMessage.cs new file mode 100644 index 00000000..508bc46b --- /dev/null +++ b/src/Gateway/src/Eventuous.Gateway/GatewayMessage.cs @@ -0,0 +1,11 @@ +namespace Eventuous.Gateway; + +public record GatewayMessage(StreamName TargetStream, object Message, Metadata? Metadata); + +[PublicAPI] +public record GatewayMessage( + StreamName TargetStream, + object Message, + Metadata? Metadata, + TProduceOptions ProduceOptions +) : GatewayMessage(TargetStream, Message, Metadata); diff --git a/src/Shovel/src/Eventuous.Gateway/GatewayMetaHelper.cs b/src/Gateway/src/Eventuous.Gateway/GatewayMetaHelper.cs similarity index 61% rename from src/Shovel/src/Eventuous.Gateway/GatewayMetaHelper.cs rename to src/Gateway/src/Eventuous.Gateway/GatewayMetaHelper.cs index f7b745ce..bbdf4d05 100644 --- a/src/Shovel/src/Eventuous.Gateway/GatewayMetaHelper.cs +++ b/src/Gateway/src/Eventuous.Gateway/GatewayMetaHelper.cs @@ -3,8 +3,8 @@ namespace Eventuous.Gateway; static class GatewayMetaHelper { - public static Metadata GetMeta(this GatewayContext gatewayContext, IMessageConsumeContext context) { - var (_, _, metadata) = gatewayContext; + public static Metadata GetMeta(this GatewayMessage gatewayMessage, IMessageConsumeContext context) { + var (_, _, metadata) = gatewayMessage; var meta = metadata == null ? new Metadata() : new Metadata(metadata); return meta.WithCausationId(context.MessageId); } diff --git a/src/Shovel/src/Eventuous.Gateway/GatewayProducer.cs b/src/Gateway/src/Eventuous.Gateway/GatewayProducer.cs similarity index 100% rename from src/Shovel/src/Eventuous.Gateway/GatewayProducer.cs rename to src/Gateway/src/Eventuous.Gateway/GatewayProducer.cs diff --git a/src/Gateway/src/Eventuous.Gateway/IGatewayTransform.cs b/src/Gateway/src/Eventuous.Gateway/IGatewayTransform.cs new file mode 100644 index 00000000..d9b919ab --- /dev/null +++ b/src/Gateway/src/Eventuous.Gateway/IGatewayTransform.cs @@ -0,0 +1,11 @@ +using Eventuous.Subscriptions.Context; + +namespace Eventuous.Gateway; + +public interface IGatewayTransform { + ValueTask RouteAndTransform(IMessageConsumeContext context); +} + +public interface IGatewayTransform { + ValueTask[]> RouteAndTransform(IMessageConsumeContext context); +} diff --git a/src/Gateway/src/Eventuous.Gateway/Registrations/GatewayRegistrations.cs b/src/Gateway/src/Eventuous.Gateway/Registrations/GatewayRegistrations.cs new file mode 100644 index 00000000..70e500bd --- /dev/null +++ b/src/Gateway/src/Eventuous.Gateway/Registrations/GatewayRegistrations.cs @@ -0,0 +1,101 @@ +// ReSharper disable CheckNamespace + +using Eventuous.Gateway; +using Eventuous.Subscriptions.Registrations; +using Microsoft.Extensions.DependencyInjection.Extensions; + +namespace Microsoft.Extensions.DependencyInjection; + +[PublicAPI] +public static class GatewayRegistrations { + public static IServiceCollection AddGateway( + this IServiceCollection services, + string subscriptionId, + RouteAndTransform routeAndTransform, + Action? configureSubscription = null, + Action>? configureBuilder = null, + bool awaitProduce = true + ) + where TSubscription : EventSubscription + where TProducer : class, IEventProducer + where TSubscriptionOptions : SubscriptionOptions { + services.AddSubscription( + subscriptionId, + builder => { + builder.Configure(configureSubscription); + configureBuilder?.Invoke(builder); + + builder.AddEventHandler( + sp => new GatewayHandler( + new GatewayProducer(sp.GetRequiredService()), + routeAndTransform, + awaitProduce + ) + ); + } + ); + + return services; + } + + public static IServiceCollection AddGateway( + this IServiceCollection services, + string subscriptionId, + Action? configureSubscription = null, + Action>? configureBuilder = null, + bool awaitProduce = true + ) + where TSubscription : EventSubscription + where TProducer : class, IEventProducer + where TSubscriptionOptions : SubscriptionOptions { + services.AddSubscription( + subscriptionId, + builder => { + builder.Configure(configureSubscription); + configureBuilder?.Invoke(builder); + builder.AddEventHandler(GetHandler); + } + ); + + return services; + + IEventHandler GetHandler(IServiceProvider sp) { + var transform = sp.GetRequiredService(); + var producer = sp.GetRequiredService(); + + return new GatewayHandler(new GatewayProducer(producer), transform, awaitProduce); + } + } + + public static IServiceCollection AddGateway( + this IServiceCollection services, + string subscriptionId, + Action? configureSubscription = null, + Action>? configureBuilder = null, + bool awaitProduce = true + ) + where TSubscription : EventSubscription + where TProducer : class, IEventProducer + where TTransform : class, IGatewayTransform + where TSubscriptionOptions : SubscriptionOptions { + services.TryAddSingleton(); + + services.AddSubscription( + subscriptionId, + builder => { + builder.Configure(configureSubscription); + configureBuilder?.Invoke(builder); + builder.AddEventHandler(GetHandler); + } + ); + + return services; + + IEventHandler GetHandler(IServiceProvider sp) { + var transform = sp.GetRequiredService(); + var producer = sp.GetRequiredService(); + + return new GatewayHandler(new GatewayProducer(producer), transform.RouteAndTransform, awaitProduce); + } + } +} diff --git a/src/Shovel/src/Eventuous.Gateway/Registrations/GatewayWithOptionsRegistrations.cs b/src/Gateway/src/Eventuous.Gateway/Registrations/GatewayWithOptionsRegistrations.cs similarity index 87% rename from src/Shovel/src/Eventuous.Gateway/Registrations/GatewayWithOptionsRegistrations.cs rename to src/Gateway/src/Eventuous.Gateway/Registrations/GatewayWithOptionsRegistrations.cs index b0216328..54b31b9b 100644 --- a/src/Shovel/src/Eventuous.Gateway/Registrations/GatewayWithOptionsRegistrations.cs +++ b/src/Gateway/src/Eventuous.Gateway/Registrations/GatewayWithOptionsRegistrations.cs @@ -13,7 +13,8 @@ public static IServiceCollection AddGateway routeAndTransform, Action? configureSubscription = null, - Action>? configureBuilder = null + Action>? configureBuilder = null, + bool awaitProduce = true ) where TSubscription : EventSubscription where TProducer : class, IEventProducer @@ -24,10 +25,12 @@ public static IServiceCollection AddGateway { builder.Configure(configureSubscription); configureBuilder?.Invoke(builder); + builder.AddEventHandler( sp => new GatewayHandler( new GatewayProducer(sp.GetRequiredService()), - routeAndTransform + routeAndTransform, + awaitProduce ) ); } @@ -40,7 +43,8 @@ public static IServiceCollection AddGateway? configureSubscription = null, - Action>? configureBuilder = null + Action>? configureBuilder = null, + bool awaitProduce = true ) where TSubscription : EventSubscription where TProducer : class, IEventProducer @@ -63,7 +67,8 @@ IEventHandler GetHandler(IServiceProvider sp) { return new GatewayHandler( new GatewayProducer(producer), - transform + transform, + awaitProduce ); } } @@ -73,7 +78,8 @@ IEventHandler GetHandler(IServiceProvider sp) { this IServiceCollection services, string subscriptionId, Action? configureSubscription = null, - Action>? configureBuilder = null + Action>? configureBuilder = null, + bool awaitProduce = true ) where TSubscription : EventSubscription where TProducer : class, IEventProducer @@ -99,7 +105,8 @@ IEventHandler GetHandler(IServiceProvider sp) { return new GatewayHandler( new GatewayProducer(producer), - transform.RouteAndTransform + transform.RouteAndTransform, + awaitProduce ); } } diff --git a/src/Shovel/test/Eventuous.Gateway.Tests/Eventuous.Gateway.Tests.csproj b/src/Gateway/test/Eventuous.Gateway.Tests/Eventuous.Gateway.Tests.csproj similarity index 100% rename from src/Shovel/test/Eventuous.Gateway.Tests/Eventuous.Gateway.Tests.csproj rename to src/Gateway/test/Eventuous.Gateway.Tests/Eventuous.Gateway.Tests.csproj diff --git a/src/Shovel/test/Eventuous.Gateway.Tests/RegistrationTests.cs b/src/Gateway/test/Eventuous.Gateway.Tests/RegistrationTests.cs similarity index 92% rename from src/Shovel/test/Eventuous.Gateway.Tests/RegistrationTests.cs rename to src/Gateway/test/Eventuous.Gateway.Tests/RegistrationTests.cs index 2eb8c603..2ba6b93a 100644 --- a/src/Shovel/test/Eventuous.Gateway.Tests/RegistrationTests.cs +++ b/src/Gateway/test/Eventuous.Gateway.Tests/RegistrationTests.cs @@ -29,13 +29,13 @@ public static void ConfigureServices(IServiceCollection services) { services.AddGateway("shovel2"); } - static ValueTask RouteAndTransform(object message) => new(); + static ValueTask RouteAndTransform(object message) => new(); public void Configure(IApplicationBuilder app) { } } class TestTransform : IGatewayTransform { - public ValueTask RouteAndTransform(IMessageConsumeContext context) + public ValueTask RouteAndTransform(IMessageConsumeContext context) => new(); } diff --git a/src/Shovel/test/Eventuous.Gateway.Tests/RegistrationTestsWithOptions.cs b/src/Gateway/test/Eventuous.Gateway.Tests/RegistrationTestsWithOptions.cs similarity index 89% rename from src/Shovel/test/Eventuous.Gateway.Tests/RegistrationTestsWithOptions.cs rename to src/Gateway/test/Eventuous.Gateway.Tests/RegistrationTestsWithOptions.cs index 34846f8f..4e42d1eb 100644 --- a/src/Shovel/test/Eventuous.Gateway.Tests/RegistrationTestsWithOptions.cs +++ b/src/Gateway/test/Eventuous.Gateway.Tests/RegistrationTestsWithOptions.cs @@ -1,5 +1,4 @@ -using Eventuous.Gateway; -using Eventuous.Producers; +using Eventuous.Producers; using Eventuous.Subscriptions; using Eventuous.Subscriptions.Context; using Eventuous.Subscriptions.Filters; @@ -29,13 +28,13 @@ public static void ConfigureServices(IServiceCollection services) { services.AddGateway("shovel2"); } - static ValueTask?> RouteAndTransform(object message) => new(); + static ValueTask[]> RouteAndTransform(object message) => new(); public void Configure(IApplicationBuilder app) { } } class TestTransform : IGatewayTransform { - public ValueTask?> RouteAndTransform(IMessageConsumeContext context) + public ValueTask[]> RouteAndTransform(IMessageConsumeContext context) => new(); } diff --git a/src/GooglePubSub/src/Eventuous.GooglePubSub/Producers/GooglePubSubProducer.cs b/src/GooglePubSub/src/Eventuous.GooglePubSub/Producers/GooglePubSubProducer.cs index 15ad9c9e..4306df06 100644 --- a/src/GooglePubSub/src/Eventuous.GooglePubSub/Producers/GooglePubSubProducer.cs +++ b/src/GooglePubSub/src/Eventuous.GooglePubSub/Producers/GooglePubSubProducer.cs @@ -107,7 +107,8 @@ PubsubMessage CreateMessage(ProducedMessage message, PubSubProduceOptions? optio message.Metadata.Remove(MetaTags.MessageId); foreach (var (key, value) in message.Metadata) { - psm.Attributes.Add(key, value.ToString()); + if (value != null) + psm.Attributes.Add(key, value.ToString()); } } diff --git a/src/GooglePubSub/src/Eventuous.GooglePubSub/Subscriptions/GooglePubSubSubscription.cs b/src/GooglePubSub/src/Eventuous.GooglePubSub/Subscriptions/GooglePubSubSubscription.cs index 5900a19d..74993ed0 100644 --- a/src/GooglePubSub/src/Eventuous.GooglePubSub/Subscriptions/GooglePubSubSubscription.cs +++ b/src/GooglePubSub/src/Eventuous.GooglePubSub/Subscriptions/GooglePubSubSubscription.cs @@ -129,7 +129,7 @@ async Task Handle(PubsubMessage msg, CancellationToken ct) { } Metadata AsMeta(MapField attributes) - => new(attributes.ToDictionary(x => x.Key, x => (object)x.Value)); + => new(attributes.ToDictionary(x => x.Key, x => (object)x.Value)!); } protected override async ValueTask Unsubscribe(CancellationToken cancellationToken) { diff --git a/src/GooglePubSub/test/Eventuous.Tests.GooglePubSub/PubSubTests.cs b/src/GooglePubSub/test/Eventuous.Tests.GooglePubSub/PubSubTests.cs index c471bf69..0deb3f9e 100644 --- a/src/GooglePubSub/test/Eventuous.Tests.GooglePubSub/PubSubTests.cs +++ b/src/GooglePubSub/test/Eventuous.Tests.GooglePubSub/PubSubTests.cs @@ -44,7 +44,7 @@ public async Task SubscribeAndProduce() { var testEvent = Auto.Create(); _handler.AssertThat().Any(x => x as TestEvent == testEvent); - await _producer.Produce(_pubsubTopic, testEvent); + await _producer.Produce(_pubsubTopic, testEvent, null); await _handler.Validate(10.Seconds()); } @@ -56,7 +56,7 @@ public async Task SubscribeAndProduceMany() { var testEvents = Auto.CreateMany(count).ToList(); _handler.AssertThat().Exactly(count, x => testEvents.Contains(x)); - await _producer.Produce(_pubsubTopic, testEvents); + await _producer.Produce(_pubsubTopic, testEvents, null); await _handler.Validate(10.Seconds()); } diff --git a/src/Kafka/docker-compose.yml b/src/Kafka/docker-compose.yml new file mode 100644 index 00000000..f4b63f12 --- /dev/null +++ b/src/Kafka/docker-compose.yml @@ -0,0 +1,60 @@ +version: '3.7' + +services: + zookeeper: + image: confluentinc/cp-zookeeper:7.0.1 + hostname: zookeeper + container_name: eventuous-zookeeper + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + kafka: + image: confluentinc/cp-kafka:7.0.1 + container_name: eventuous-kafka + hostname: kafka + ports: + # To learn about configuring Kafka for access across networks see + # https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/ + - "9092:9092" + depends_on: + - zookeeper + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:19092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + + kafka-schema-registry: + image: confluentinc/cp-schema-registry:7.0.1 + container_name: eventuous-kafka-schema-registry + hostname: schema-registry + depends_on: + - zookeeper + - kafka + ports: + - "8081:8081" + environment: + SCHEMA_REGISTRY_HOST_NAME: kafka-schema-registry + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:19092' + SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 + + kafka-ui: + container_name: kafka-ui + image: provectuslabs/kafka-ui:latest + ports: + - "8080:8080" + depends_on: + - zookeeper + - kafka + - kafka1 + - schemaregistry + environment: + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:19092 + KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181 + KAFKA_CLUSTERS_0_JMXPORT: 9997 + KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry:8085 \ No newline at end of file diff --git a/src/Kafka/src/Eventuous.Kafka/Eventuous.Kafka.csproj b/src/Kafka/src/Eventuous.Kafka/Eventuous.Kafka.csproj new file mode 100644 index 00000000..45ea4bff --- /dev/null +++ b/src/Kafka/src/Eventuous.Kafka/Eventuous.Kafka.csproj @@ -0,0 +1,19 @@ + + + README.md + + + + + + + + + + + + + + + + diff --git a/src/Kafka/src/Eventuous.Kafka/KafkaHeaderKeys.cs b/src/Kafka/src/Eventuous.Kafka/KafkaHeaderKeys.cs new file mode 100644 index 00000000..fb18343a --- /dev/null +++ b/src/Kafka/src/Eventuous.Kafka/KafkaHeaderKeys.cs @@ -0,0 +1,6 @@ +namespace Eventuous.Kafka; + +public static class KafkaHeaderKeys { + public static string MessageTypeHeader { get; set; } = "message-type"; + public static string ContentTypeHeader { get; set; } = "content-type"; +} diff --git a/src/Kafka/src/Eventuous.Kafka/MetadataExtensions.cs b/src/Kafka/src/Eventuous.Kafka/MetadataExtensions.cs new file mode 100644 index 00000000..f330cfcb --- /dev/null +++ b/src/Kafka/src/Eventuous.Kafka/MetadataExtensions.cs @@ -0,0 +1,36 @@ +using System.Text; +using Confluent.Kafka; + +namespace Eventuous.Kafka; + +static class MetadataExtensions { + public static Headers AsKafkaHeaders(this Metadata metadata) { + var headers = new Headers(); + + // ReSharper disable once ForeachCanBePartlyConvertedToQueryUsingAnotherGetEnumerator + foreach (var entry in metadata) { + if (entry.Key == MetaTags.MessageId) continue; + + headers.AddHeader(entry.Key, entry.Value?.ToString()); + } + + return headers; + } + + public static Headers AddHeader(this Headers headers, string key, string? value) { + if (value != null) { + headers.Add(key, Encoding.UTF8.GetBytes(value)); + } + return headers; + } + + public static Metadata AsMetadata(this Headers headers) { + var metadata = new Metadata(); + + foreach (var header in headers) { + metadata.Add(header.Key, Encoding.UTF8.GetString(header.GetValueBytes())); + } + + return metadata; + } +} diff --git a/src/Kafka/src/Eventuous.Kafka/Producers/KafkaBasicProducer.cs b/src/Kafka/src/Eventuous.Kafka/Producers/KafkaBasicProducer.cs new file mode 100644 index 00000000..800408cf --- /dev/null +++ b/src/Kafka/src/Eventuous.Kafka/Producers/KafkaBasicProducer.cs @@ -0,0 +1,107 @@ +using Confluent.Kafka; +using Eventuous.Producers; +using Eventuous.Producers.Diagnostics; +using Microsoft.Extensions.Hosting; + +namespace Eventuous.Kafka.Producers; + +/// +/// Produces messages with byte[] payload without using the schema registry. The message type is specified in the +/// headers, so the type mapping is required. +/// +public class KafkaBasicProducer : BaseProducer, IHostedService { + readonly IProducer _producerWithKey; + readonly IProducer _producerWithoutKey; + readonly IEventSerializer _serializer; + + public KafkaBasicProducer(KafkaProducerOptions options, IEventSerializer? serializer = null) : + base(TracingOptions) { + _producerWithKey = new ProducerBuilder(options.ProducerConfig).Build(); + _producerWithoutKey = new DependentProducerBuilder(_producerWithKey.Handle).Build(); + + _serializer = serializer ?? DefaultEventSerializer.Instance; + } + + static readonly ProducerTracingOptions TracingOptions = new() { + MessagingSystem = "kafka", + DestinationKind = "topic", + ProduceOperation = "produce" + }; + + protected override async Task ProduceMessages( + StreamName stream, + IEnumerable messages, + KafkaProduceOptions? options, + CancellationToken cancellationToken = default + ) { + foreach (var producedMessage in messages) { + var serialized = _serializer.SerializeEvent(producedMessage.Message); + var headers = producedMessage.Metadata?.AsKafkaHeaders() ?? new Headers(); + + headers + .AddHeader(KafkaHeaderKeys.MessageTypeHeader, serialized.EventType) + .AddHeader(KafkaHeaderKeys.ContentTypeHeader, serialized.ContentType); + + await ProduceLocal(); + + Task ProduceLocal() => options?.PartitionKey != null ? ProducePartitioned() : ProduceNotPartitioned(); + + async Task ProducePartitioned() { + var message = new Message { + Value = serialized.Payload, + Key = options.PartitionKey, + Headers = headers + }; + + if (producedMessage.OnAck == null) { + await _producerWithKey.ProduceAsync(stream, message, cancellationToken).NoContext(); + } + else { + _producerWithKey.Produce(stream, message, r => DeliveryHandler(r, producedMessage)); + } + + void DeliveryHandler(DeliveryReport report, ProducedMessage msg) + => Report(report.Error, msg); + } + + async Task ProduceNotPartitioned() { + var message = new Message { + Value = serialized.Payload, + Headers = headers + }; + + if (producedMessage.OnAck == null) { + await _producerWithoutKey.ProduceAsync(stream, message, cancellationToken).NoContext(); + } + else { + _producerWithoutKey.Produce(stream, message, r => DeliveryHandler(r, producedMessage)); + } + + void DeliveryHandler(DeliveryReport report, ProducedMessage msg) + => Report(report.Error, msg); + } + + void Report(Error error, ProducedMessage message) { + if (error.IsError) { + producedMessage.OnNack?.Invoke(message, error.Reason, null).NoContext().GetAwaiter().GetResult(); + } + + producedMessage.OnAck(message).NoContext().GetAwaiter().GetResult(); + } + } + } + + public Task StartAsync(CancellationToken cancellationToken) { + ReadyNow(); + return Task.CompletedTask; + } + + public async Task StopAsync(CancellationToken cancellationToken) { + while (!cancellationToken.IsCancellationRequested) { + var count = _producerWithKey.Flush(TimeSpan.FromSeconds(10)); + if (count == 0) break; + + await Task.Delay(100, cancellationToken).NoContext(); + } + } +} diff --git a/src/Kafka/src/Eventuous.Kafka/Producers/KafkaProduceOptions.cs b/src/Kafka/src/Eventuous.Kafka/Producers/KafkaProduceOptions.cs new file mode 100644 index 00000000..4e282b2a --- /dev/null +++ b/src/Kafka/src/Eventuous.Kafka/Producers/KafkaProduceOptions.cs @@ -0,0 +1,3 @@ +namespace Eventuous.Kafka.Producers; + +public record KafkaProduceOptions(string PartitionKey); diff --git a/src/Kafka/src/Eventuous.Kafka/Producers/KafkaProducerOptions.cs b/src/Kafka/src/Eventuous.Kafka/Producers/KafkaProducerOptions.cs new file mode 100644 index 00000000..bd05dcd4 --- /dev/null +++ b/src/Kafka/src/Eventuous.Kafka/Producers/KafkaProducerOptions.cs @@ -0,0 +1,5 @@ +using Confluent.Kafka; + +namespace Eventuous.Kafka.Producers; + +public record KafkaProducerOptions(ProducerConfig ProducerConfig); diff --git a/src/Kafka/src/Eventuous.Kafka/README.md b/src/Kafka/src/Eventuous.Kafka/README.md new file mode 100644 index 00000000..36c22b1b --- /dev/null +++ b/src/Kafka/src/Eventuous.Kafka/README.md @@ -0,0 +1 @@ +# Eventuous Kafka \ No newline at end of file diff --git a/src/Kafka/start.sh b/src/Kafka/start.sh new file mode 100755 index 00000000..44f4bcd2 --- /dev/null +++ b/src/Kafka/start.sh @@ -0,0 +1 @@ +docker compose -f ../EventStore/docker-compose.yml -f ../Mongo/docker-compose.yml -f docker-compose.yml up \ No newline at end of file diff --git a/src/Kafka/test/Eventuous.Tests.Kafka/BasicProducerTests.cs b/src/Kafka/test/Eventuous.Tests.Kafka/BasicProducerTests.cs new file mode 100644 index 00000000..2ca27e03 --- /dev/null +++ b/src/Kafka/test/Eventuous.Tests.Kafka/BasicProducerTests.cs @@ -0,0 +1,126 @@ +using Confluent.Kafka; +using Eventuous.Kafka; +using Eventuous.Kafka.Producers; +using Eventuous.Producers; +using Eventuous.Sut.Subs; + +namespace Eventuous.Tests.Kafka; + +public class BasicProducerTests { + readonly ITestOutputHelper _output; + + public BasicProducerTests(ITestOutputHelper output) => _output = output; + + const string BrokerList = "localhost:9092"; + + static readonly Fixture Auto = new(); + + [Fact] + public async Task ShouldProduceAndWait() { + var topicName = Auto.Create(); + _output.WriteLine($"Topic: {topicName}"); + + var producer = new KafkaBasicProducer( + new KafkaProducerOptions(new ProducerConfig { BootstrapServers = BrokerList }) + ); + + var produced = new List(); + + var events = Auto.CreateMany().ToArray(); + await producer.StartAsync(default); + + ValueTask OnAck(ProducedMessage msg) { + _output.WriteLine("Produced message: {0}", msg.Message); + produced.Add((TestEvent)msg.Message); + return ValueTask.CompletedTask; + } + + await producer.Produce( + new StreamName(topicName), + events, + new Metadata(), + new KafkaProduceOptions("test"), + onAck: OnAck + ); + + await producer.StopAsync(default); + + produced.Should().BeEquivalentTo(events); + + using var consumer = GetConsumer(topicName); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + consumer.Subscribe(topicName); + + var consumed = new List(); + + while (!cts.IsCancellationRequested) { + var msg = consumer.Consume(cts.Token); + if (msg == null) return; + + var meta = msg.Message.Headers.AsMetadata(); + + var messageType = meta[KafkaHeaderKeys.MessageTypeHeader] as string; + var contentType = meta[KafkaHeaderKeys.ContentTypeHeader] as string; + + var result = + DefaultEventSerializer.Instance.DeserializeEvent(msg.Message.Value, messageType!, contentType!) as + SuccessfullyDeserialized; + + var evt = (result!.Payload as TestEvent)!; + _output.WriteLine($"Consumed {evt}"); + consumed.Add(evt); + if (consumed.Count == events.Length) break; + } + + _output.WriteLine($"Consumed {consumed.Count} events"); + consumed.Should().BeEquivalentTo(events); + } + + IConsumer GetConsumer(string groupId) { + var config = new ConsumerConfig { + BootstrapServers = BrokerList, + GroupId = groupId, + EnableAutoCommit = false, + StatisticsIntervalMs = 5000, + SessionTimeoutMs = 6000, + AutoOffsetReset = AutoOffsetReset.Earliest, + EnablePartitionEof = true, + PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky + }; + + return new ConsumerBuilder(config) + .SetErrorHandler((_, e) => _output.WriteLine($"Error: {e.Reason}")) + .SetStatisticsHandler((_, json) => _output.WriteLine($"Statistics: {json}")) + .SetPartitionsAssignedHandler( + (c, partitions) => { + _output.WriteLine( + "Partitions incrementally assigned: [" + + string.Join(',', partitions.Select(p => p.Partition.Value)) + + "], all: [" + + string.Join(',', c.Assignment.Concat(partitions).Select(p => p.Partition.Value)) + + "]" + ); + } + ) + .SetPartitionsRevokedHandler( + (c, partitions) => { + var remaining = c.Assignment.Where( + atp => partitions.All(rtp => rtp.TopicPartition != atp) + ); + + _output.WriteLine( + "Partitions incrementally revoked: [" + + string.Join(',', partitions.Select(p => p.Partition.Value)) + + "], remaining: [" + + string.Join(',', remaining.Select(p => p.Partition.Value)) + + "]" + ); + } + ) + .SetPartitionsLostHandler( + (c, partitions) => _output.WriteLine($"Partitions were lost: [{string.Join(", ", partitions)}]") + ) + .Build(); + } +} diff --git a/src/Kafka/test/Eventuous.Tests.Kafka/Eventuous.Tests.Kafka.csproj b/src/Kafka/test/Eventuous.Tests.Kafka/Eventuous.Tests.Kafka.csproj new file mode 100644 index 00000000..cf6197dc --- /dev/null +++ b/src/Kafka/test/Eventuous.Tests.Kafka/Eventuous.Tests.Kafka.csproj @@ -0,0 +1,16 @@ + + + net6.0 + + true + false + true + osx-x64 + + + + + + + + diff --git a/src/Mongo/docker-compose.yml b/src/Mongo/docker-compose.yml new file mode 100644 index 00000000..86bc5982 --- /dev/null +++ b/src/Mongo/docker-compose.yml @@ -0,0 +1,11 @@ +version: '3.7' + +services: + mongo: + container_name: eventuous-mongo + image: mongo + ports: + - '27017:27017' + environment: + MONGO_INITDB_ROOT_USERNAME: mongoadmin + MONGO_INITDB_ROOT_PASSWORD: secret diff --git a/src/RabbitMq/docker-compose.yml b/src/RabbitMq/docker-compose.yml new file mode 100644 index 00000000..773e7810 --- /dev/null +++ b/src/RabbitMq/docker-compose.yml @@ -0,0 +1,12 @@ +version: '3.7' + +services: + rabbitmq: + container_name: eventuous-rabbitmq + hostname: rabbitmq + image: rabbitmq:management-alpine + ports: + - '4369:4369' + - '5672:5672' + - '25672:25672' + - '15672:15672' diff --git a/src/RabbitMq/src/Eventuous.RabbitMq/Producers/RabbitMqProducer.cs b/src/RabbitMq/src/Eventuous.RabbitMq/Producers/RabbitMqProducer.cs index e751a3de..8e2ffd12 100644 --- a/src/RabbitMq/src/Eventuous.RabbitMq/Producers/RabbitMqProducer.cs +++ b/src/RabbitMq/src/Eventuous.RabbitMq/Producers/RabbitMqProducer.cs @@ -26,9 +26,9 @@ public class RabbitMqProducer : BaseProducer, IHostedSer /// Optional: event serializer instance /// Optional: additional configuration for the exchange public RabbitMqProducer( - ConnectionFactory connectionFactory, - IEventSerializer? serializer = null, - RabbitMqExchangeOptions? options = null + ConnectionFactory connectionFactory, + IEventSerializer? serializer = null, + RabbitMqExchangeOptions? options = null ) : base(TracingOptions) { _options = options; _serializer = serializer ?? DefaultEventSerializer.Instance; @@ -71,8 +71,7 @@ protected override async Task ProduceMessages( } void Publish(string stream, ProducedMessage message, RabbitMqProduceOptions? options) { - if (_channel == null) - throw new InvalidOperationException("Producer hasn't been initialized, call Initialize"); + if (_channel == null) throw new InvalidOperationException("Producer hasn't been initialized, call Initialize"); var (msg, metadata) = (message.Message, message.Metadata); var (eventType, contentType, payload) = _serializer.SerializeEvent(msg); @@ -100,19 +99,18 @@ void Publish(string stream, ProducedMessage message, RabbitMqProduceOptions? opt readonly ExchangeCache _exchangeCache = new(); - void EnsureExchange(string exchange) { - _exchangeCache.EnsureExchange( + void EnsureExchange(string exchange) + => _exchangeCache.EnsureExchange( exchange, () => _channel!.ExchangeDeclare( exchange, - _options?.Type ?? ExchangeType.Fanout, - _options?.Durable ?? true, + _options?.Type ?? ExchangeType.Fanout, + _options?.Durable ?? true, _options?.AutoDelete ?? false, _options?.Arguments ) ); - } async Task Confirm(CancellationToken cancellationToken) { while (!_channel!.WaitForConfirms(ConfirmTimeout) && !cancellationToken.IsCancellationRequested) { @@ -133,4 +131,4 @@ public Task StopAsync(CancellationToken cancellationToken = default) { return Task.CompletedTask; } -} \ No newline at end of file +} diff --git a/src/RabbitMq/src/Eventuous.RabbitMq/Subscriptions/RabbitMqSubscription.cs b/src/RabbitMq/src/Eventuous.RabbitMq/Subscriptions/RabbitMqSubscription.cs index eee2acb2..0cde826d 100644 --- a/src/RabbitMq/src/Eventuous.RabbitMq/Subscriptions/RabbitMqSubscription.cs +++ b/src/RabbitMq/src/Eventuous.RabbitMq/Subscriptions/RabbitMqSubscription.cs @@ -157,7 +157,7 @@ IMessageConsumeContext CreateContext(object sender, BasicDeliverEventArgs receiv ); var meta = received.BasicProperties.Headers != null - ? new Metadata(received.BasicProperties.Headers.ToDictionary(x => x.Key, x => x.Value)) + ? new Metadata(received.BasicProperties.Headers.ToDictionary(x => x.Key, x => x.Value)!) : null; return new MessageConsumeContext( diff --git a/src/RabbitMq/test/Eventuous.Tests.RabbitMq/SubscriptionSpec.cs b/src/RabbitMq/test/Eventuous.Tests.RabbitMq/SubscriptionSpec.cs index 7f4ad1ff..3b4ba9d4 100644 --- a/src/RabbitMq/test/Eventuous.Tests.RabbitMq/SubscriptionSpec.cs +++ b/src/RabbitMq/test/Eventuous.Tests.RabbitMq/SubscriptionSpec.cs @@ -55,7 +55,7 @@ public async Task SubscribeAndProduce() { _handler.AssertThat().Any(x => x as TestEvent == testEvent); - await _producer.Produce(_exchange, testEvent); + await _producer.Produce(_exchange, testEvent, new Metadata()); await _handler.Validate(10.Seconds()); } @@ -67,7 +67,7 @@ public async Task SubscribeAndProduceMany() { _handler.AssertThat().Exactly(count, x => testEvents.Contains(x)); - await _producer.Produce(_exchange, testEvents); + await _producer.Produce(_exchange, testEvents, new Metadata()); await _handler.Validate(10.Seconds()); } diff --git a/src/Shovel/src/Eventuous.Gateway/GatewayContext.cs b/src/Shovel/src/Eventuous.Gateway/GatewayContext.cs deleted file mode 100644 index 4b8861a1..00000000 --- a/src/Shovel/src/Eventuous.Gateway/GatewayContext.cs +++ /dev/null @@ -1,11 +0,0 @@ -namespace Eventuous.Gateway; - -public record GatewayContext(StreamName TargetStream, object? Message, Metadata? Metadata); - -[PublicAPI] -public record GatewayContext( - StreamName TargetStream, - object? Message, - Metadata? Metadata, - TProduceOptions ProduceOptions -) : GatewayContext(TargetStream, Message, Metadata); \ No newline at end of file diff --git a/src/Shovel/src/Eventuous.Gateway/GatewayHandler.cs b/src/Shovel/src/Eventuous.Gateway/GatewayHandler.cs deleted file mode 100644 index 9b659800..00000000 --- a/src/Shovel/src/Eventuous.Gateway/GatewayHandler.cs +++ /dev/null @@ -1,32 +0,0 @@ -using Eventuous.Subscriptions.Context; - -namespace Eventuous.Gateway; - -public delegate ValueTask RouteAndTransform(IMessageConsumeContext context); - -class GatewayHandler : BaseEventHandler { - readonly IEventProducer _eventProducer; - readonly RouteAndTransform _transform; - - public GatewayHandler(IEventProducer eventProducer, RouteAndTransform transform) { - _eventProducer = eventProducer; - _transform = transform; - } - - public override async ValueTask HandleEvent(IMessageConsumeContext context) { - var shovelMessage = await _transform(context).NoContext(); - - if (shovelMessage?.Message == null) return EventHandlingStatus.Ignored; - - await _eventProducer - .Produce( - shovelMessage.TargetStream, - shovelMessage.Message, - shovelMessage.GetMeta(context), - context.CancellationToken - ) - .NoContext(); - - return EventHandlingStatus.Success; - } -} diff --git a/src/Shovel/src/Eventuous.Gateway/GatewayHandlerWithOptions.cs b/src/Shovel/src/Eventuous.Gateway/GatewayHandlerWithOptions.cs deleted file mode 100644 index 497ca97f..00000000 --- a/src/Shovel/src/Eventuous.Gateway/GatewayHandlerWithOptions.cs +++ /dev/null @@ -1,41 +0,0 @@ -using Eventuous.Subscriptions.Context; - -namespace Eventuous.Gateway; - -public delegate ValueTask?> RouteAndTransform( - IMessageConsumeContext message -); - -public class GatewayHandler : BaseEventHandler - where TProduceOptions : class { - readonly IEventProducer _eventProducer; - - readonly RouteAndTransform _transform; - - public GatewayHandler(IEventProducer eventProducer, RouteAndTransform transform) { - _eventProducer = eventProducer; - _transform = transform; - } - - public override async ValueTask HandleEvent(IMessageConsumeContext context) { - var shovelMessage = await _transform(context).NoContext(); - - if (shovelMessage?.Message == null) return EventHandlingStatus.Ignored; - - try { - await _eventProducer.Produce( - shovelMessage.TargetStream, - shovelMessage.Message, - shovelMessage.GetMeta(context), - shovelMessage.ProduceOptions, - context.CancellationToken - ) - .NoContext(); - } - catch (OperationCanceledException e) { - context.Nack(e); - } - - return EventHandlingStatus.Success; - } -} diff --git a/src/Shovel/src/Eventuous.Gateway/IGatewayTransform.cs b/src/Shovel/src/Eventuous.Gateway/IGatewayTransform.cs deleted file mode 100644 index 0f9078b5..00000000 --- a/src/Shovel/src/Eventuous.Gateway/IGatewayTransform.cs +++ /dev/null @@ -1,11 +0,0 @@ -using Eventuous.Subscriptions.Context; - -namespace Eventuous.Gateway; - -public interface IGatewayTransform { - ValueTask RouteAndTransform(IMessageConsumeContext context); -} - -public interface IGatewayTransform { - ValueTask?> RouteAndTransform(IMessageConsumeContext context); -} diff --git a/src/Shovel/src/Eventuous.Gateway/Registrations/GatewayRegistrations.cs b/src/Shovel/src/Eventuous.Gateway/Registrations/GatewayRegistrations.cs deleted file mode 100644 index e83a946d..00000000 --- a/src/Shovel/src/Eventuous.Gateway/Registrations/GatewayRegistrations.cs +++ /dev/null @@ -1,86 +0,0 @@ -// ReSharper disable CheckNamespace - -using Eventuous.Gateway; -using Microsoft.Extensions.DependencyInjection.Extensions; - -namespace Microsoft.Extensions.DependencyInjection; - -[PublicAPI] -public static class GatewayRegistrations { - public static IServiceCollection AddGateway( - this IServiceCollection services, - string subscriptionId, - RouteAndTransform routeAndTransform, - Action? configureSubscription = null - ) - where TSubscription : EventSubscription - where TProducer : class, IEventProducer - where TSubscriptionOptions : SubscriptionOptions { - services.AddSubscription( - subscriptionId, - builder => builder - .Configure(configureSubscription) - .AddEventHandler( - sp => new GatewayHandler( - new GatewayProducer(sp.GetRequiredService()), - routeAndTransform - ) - ) - ); - - return services; - } - - public static IServiceCollection AddGateway( - this IServiceCollection services, - string subscriptionId, - Action? configureSubscription = null - ) - where TSubscription : EventSubscription - where TProducer : class, IEventProducer - where TSubscriptionOptions : SubscriptionOptions { - services.AddSubscription( - subscriptionId, - builder => builder - .Configure(configureSubscription) - .AddEventHandler(GetHandler) - ); - - return services; - - IEventHandler GetHandler(IServiceProvider sp) { - var transform = sp.GetRequiredService(); - var producer = sp.GetRequiredService(); - - return new GatewayHandler(new GatewayProducer(producer), transform); - } - } - - public static IServiceCollection AddGateway( - this IServiceCollection services, - string subscriptionId, - Action? configureSubscription = null - ) - where TSubscription : EventSubscription - where TProducer : class, IEventProducer - where TTransform : class, IGatewayTransform - where TSubscriptionOptions : SubscriptionOptions { - services.TryAddSingleton(); - services.AddSubscription( - subscriptionId, - builder => builder - .Configure(configureSubscription) - .AddEventHandler(GetHandler) - ); - - return services; - - - IEventHandler GetHandler(IServiceProvider sp) { - var transform = sp.GetRequiredService(); - var producer = sp.GetRequiredService(); - - return new GatewayHandler(new GatewayProducer(producer), transform.RouteAndTransform); - } - } -} \ No newline at end of file