diff --git a/src/Testing/CoreTests/ConfiguredMessageExtensionsTests.cs b/src/Testing/CoreTests/ConfiguredMessageExtensionsTests.cs index 2e1aa31a..df9127db 100644 --- a/src/Testing/CoreTests/ConfiguredMessageExtensionsTests.cs +++ b/src/Testing/CoreTests/ConfiguredMessageExtensionsTests.cs @@ -19,6 +19,44 @@ public void delayed_for() configured.Message.ShouldBe(inner); } + [Fact] + public void chain_delayed_for() + { + var delay = 5.Minutes(); + var inner = new Message1(); + + var configured = inner.WithTenantId("one") + .DelayedFor(5.Minutes()); + + configured.Options.ScheduleDelay.ShouldBe(delay); + configured.Message.ShouldBe(inner); + configured.Options.TenantId.ShouldBe("one"); + } + + [Fact] + public void chain_with_tenant_id() + { + var delay = 5.Minutes(); + var inner = new Message1(); + + var configured = inner + .DelayedFor(delay).WithTenantId("one"); + + configured.Options.ScheduleDelay.ShouldBe(delay); + configured.Options.TenantId.ShouldBe("one"); + } + + [Fact] + public void chain_with_group_id() + { + var inner = new Message1(); + var configured = inner.WithTenantId("one") + .WithGroupId("g1"); + + configured.Options.TenantId.ShouldBe("one"); + configured.Options.GroupId.ShouldBe("g1"); + } + [Fact] public void scheduled_at() { @@ -30,6 +68,19 @@ public void scheduled_at() configured.Options.ScheduledTime.ShouldBe(time); configured.Message.ShouldBe(inner); } + + [Fact] + public void chain_scheduled_at() + { + var time = (DateTimeOffset)DateTime.Today; + var inner = new Message1(); + + var configured = inner.WithTenantId("one").ScheduledAt(time); + + configured.Options.ScheduledTime.ShouldBe(time); + configured.Message.ShouldBe(inner); + configured.Options.TenantId.ShouldBe("one"); + } [Fact] public void to_endpoint() @@ -67,4 +118,19 @@ public async Task to_topic() await bus.Received().BroadcastToTopicAsync("blue", inner); } + + [Fact] + public async Task chaining_to_topic() + { + var inner = new Message1(); + var message = inner.WithTenantId("one").ToTopic("blue"); + message.Message.ShouldBeSameAs(inner); + message.Topic.ShouldBe("blue"); + message.Options.TenantId.ShouldBe("one"); + + var bus = Substitute.For(); + await message.As().ApplyAsync(bus); + + await bus.Received().BroadcastToTopicAsync("blue", inner, message.Options); + } } \ No newline at end of file diff --git a/src/Wolverine/ISendMyself.cs b/src/Wolverine/ISendMyself.cs index 5b8d2f48..4ac13ec3 100644 --- a/src/Wolverine/ISendMyself.cs +++ b/src/Wolverine/ISendMyself.cs @@ -38,6 +38,18 @@ public static DeliveryMessage WithTenantId(this T message, string tenantId return new DeliveryMessage(message, new DeliveryOptions { TenantId = tenantId }); } + /// + /// Create a cascading message tagged to a specific tenant id + /// + /// + /// + /// + public static DeliveryMessage WithTenantId(this DeliveryMessage message, string tenantId) + { + message.Options.TenantId = tenantId; + return message; + } + /// /// Create a cascading message tagged to a specific group id /// @@ -49,6 +61,19 @@ public static DeliveryMessage WithGroupId(this T message, string groupId) { return new DeliveryMessage(message, new DeliveryOptions { GroupId = groupId }); } + + /// + /// Create a cascading message tagged to a specific group id + /// + /// + /// + /// + /// + public static DeliveryMessage WithGroupId(this DeliveryMessage message, string groupId) + { + message.Options.GroupId = groupId; + return message; + } /// /// Create a cascading message tagged to a specific group id and scheduled for a set time @@ -99,6 +124,18 @@ public static ScheduledMessage ScheduledAt(this T message, DateTimeOffset { return new ScheduledMessage(message, time); } + + /// + /// Schedule the inner outgoing message to be sent at the specified time + /// + /// + /// + /// + public static DeliveryMessage ScheduledAt(this DeliveryMessage message, DateTimeOffset time) + { + message.Options.ScheduledTime = time; + return message; + } /// /// Schedule the inner outgoing message to be sent after the specified delay @@ -106,9 +143,21 @@ public static ScheduledMessage ScheduledAt(this T message, DateTimeOffset /// /// /// - public static DelayedMessage DelayedFor(this T message, TimeSpan delay) + public static DeliveryMessage DelayedFor(this T message, TimeSpan delay) { - return new DelayedMessage(message, delay); + return new DeliveryMessage(message, new DeliveryOptions{ScheduleDelay = delay}); + } + + /// + /// Schedule the inner outgoing message to be sent after the specified delay + /// + /// + /// + /// + public static DeliveryMessage DelayedFor(this DeliveryMessage message, TimeSpan delay) + { + message.Options.ScheduleDelay = delay; + return message; } /// @@ -121,7 +170,18 @@ public static RoutedToEndpointMessage ToEndpoint(this T message, string en { return new RoutedToEndpointMessage(endpointName, message, options); } - + + /// + /// Send a message directly to the named endpoint as a cascading message + /// + /// + /// + /// + public static RoutedToEndpointMessage ToEndpoint(this DeliveryMessage message, string endpointName) + { + return new RoutedToEndpointMessage(endpointName, message.Message, message.Options); + } + /// /// Send a message directly to the specific destination as a cascading message /// @@ -133,6 +193,17 @@ public static RoutedToEndpointMessage ToDestination(this T message, Uri de return new RoutedToEndpointMessage(destination, message, options); } + /// + /// Send a message directly to the specific destination as a cascading message + /// + /// + /// + /// + public static RoutedToEndpointMessage ToDestination(this DeliveryMessage message, Uri destination) + { + return new RoutedToEndpointMessage(destination, message.Message, message.Options); + } + /// /// Send a message to the supplied topic /// @@ -145,6 +216,19 @@ public static TopicMessage ToTopic(this T message, string topic, DeliveryO { return new TopicMessage(message, topic, options); } + + /// + /// Send a message to the supplied topic + /// + /// + /// The topic name for the underlying message broker + /// Optional delivery options + /// + /// + public static TopicMessage ToTopic(this DeliveryMessage message, string topic) + { + return new TopicMessage(message.Message, topic, message.Options); + } } public record TopicMessage(T Message, string Topic, DeliveryOptions? Options) : ISendMyself @@ -155,16 +239,6 @@ ValueTask ISendMyself.ApplyAsync(IMessageContext context) } } -/// -/// Wrapper for a cascading message that has delayed delivery -/// -public class DelayedMessage : DeliveryMessage -{ - public DelayedMessage(T message, TimeSpan delay) : base(message, new DeliveryOptions { ScheduleDelay = delay }) - { - } -} - public class ScheduledMessage : DeliveryMessage { public ScheduledMessage(T message, DateTimeOffset time) : base(message,