Skip to content

Commit e1fbb4f

Browse files
committed
Add queue whitelisting
1 parent c6003b4 commit e1fbb4f

File tree

4 files changed

+245
-36
lines changed

4 files changed

+245
-36
lines changed

README.md

+30-11
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@ This is inspired from the [Rabbit MQ implementation](http://docs.servicestack.ne
1717
- OneWay MQ and HTTP Service Clients are Substitutable
1818

1919

20-
>> ServiceStack has added MQ support for Azure Service Bus as part of their v4.5.14 release maintained >> at https://github.com/ServiceStack/ServiceStack.Azure.
21-
>>
22-
>> I would recommend using the official implementation instead of this one if it covers your needs.
23-
>>
24-
>> One reason you may want to give this non-official implementation a try is that you are not targeting
25-
>> .NET Core and you need some feature that is not part of the official MQ Server.
26-
>>
27-
>> Ideally, the official package eventually offers all features (and likely more) and this repository can enjoy
28-
>> an early retirement.
20+
> ServiceStack has added MQ support for Azure Service Bus as part of their v4.5.14 release maintained at https://github.com/ServiceStack/ServiceStack.Azure.
21+
>
22+
> I would recommend using the official implementation instead of this one if it covers your needs.
23+
>
24+
> One reason you may want to give this non-official implementation a try is that you are not targeting
25+
> .NET Core and you need some feature that is not part of the official MQ Server.
26+
>
27+
> Ideally, the official package eventually offers all features (and likely more) and this repository can enjoy
28+
> an early retirement.
2929
3030
## Adding Azure Service Bus MQ support to ServiceStack
3131

@@ -64,6 +64,10 @@ The [AzureBusServer](src\ServiceStack.AzureServiceBus\AzureBusServer.cs) has the
6464
- `Action<QueueDescription>` **CreateQueueFilter** - A filter to customize the options Azure Queues are created/updated with.
6565
- `Action<string, BrokeredMessage>` **GetMessageFilter** - Called every time a message is received.
6666
- `Action<string, BrokeredMessage, IMessage>` **PublishMessageFilter** - Called every time a message gets published.
67+
- `string[]` **PriorityQueuesWhitelist** - If you only want to enable priority queue handlers (and threads) for specific message types. All message types have priority queues by default.
68+
- `bool` **DisablePriorityQueues** - No priority queue will be created or listened to.
69+
- `string[]` **PublishResponsesWhitelist** - Opt-in to only publish responses on this whitelist. All responses are published by default.
70+
- `bool` **DisablePublishingResponses** - No response will be published.
6771

6872
As an alternative to a connection string, you can pass an instance of `AzureBusMessageFactory` to the `AzureBusServer` constructor and provide your own [NamespaceManager](https://docs.microsoft.com/en-us/dotnet/api/microsoft.servicebus.namespacemanager?redirectedfrom=MSDN&view=azureservicebus-4.1.1#microsoft_servicebus_namespacemanager) and [MessagingFactory](https://docs.microsoft.com/en-us/dotnet/api/microsoft.servicebus.messaging.messagingfactory?view=azureservicebus-4.1.1).
6973

@@ -118,9 +122,24 @@ using (var mqClient = mqServer.CreateMessageQueueClient())
118122
}
119123
```
120124

121-
Note that `brokeredMsg` parameter of `GetMessageFilter` when explicitly retrieving a message results in a timeout.
125+
Note that the `brokeredMsg` parameter of `GetMessageFilter` can be null when explicitly retrieving a message results in a timeout.
126+
127+
## Whitelisting priority messages and publishing responses
128+
129+
By default, all registered handlers will result in listening to a normal priority queue and a high priority queue. As well, all message responses get published to their respective queues.
130+
131+
Priority messages and publishing responses can be entirely disabled by setting `DisablePriorityQueues` and `DisablePublishingResponses` respectively to true.
132+
133+
It is also possible to whitelist the priority queues and responses to publish by message type.
134+
135+
```
136+
// only use a priority queue for Hello messages
137+
mqServer.PriorityQueuesWhitelist = new[] { nameof(Hello) };
138+
139+
// only publish HelloResponse responses
140+
mqServer.PublishResponsesWhitelist = new[] { nameof(HelloResponse) };
141+
```
122142

123143
## Upcoming Features
124144

125-
- [ ] queue whitelisting
126145
- [ ] error handler

src/ServiceStack.AzureServiceBus/AzureBusExtensions.cs

+9-4
Original file line numberDiff line numberDiff line change
@@ -277,13 +277,18 @@ public static Task RegisterQueuesAsync(
277277
QueueNames queueNames,
278278
Action<QueueDescription> createQueueFilter = null)
279279
{
280-
return Task.WhenAll(
281-
namespaceMgr.RegisterQueueAsync(queueNames.In, createQueueFilter),
282-
namespaceMgr.RegisterQueueAsync(queueNames.Priority, createQueueFilter),
283-
namespaceMgr.RegisterQueueAsync(queueNames.Out, createQueueFilter));
280+
return RegisterQueuesAsync(namespaceMgr, new[] { queueNames.In, queueNames.Out, queueNames.Priority }, createQueueFilter);
284281
// queueNames.Dlq is created by Azure Service Bus
285282
}
286283

284+
public static Task RegisterQueuesAsync(
285+
this NamespaceManager namespaceMgr,
286+
IEnumerable<string> queueNames,
287+
Action<QueueDescription> createQueueFilter = null)
288+
{
289+
return Task.WhenAll(queueNames.Select(x => namespaceMgr.RegisterQueueAsync(x, createQueueFilter)));
290+
}
291+
287292
public static Task RegisterQueuesAsync<T>(
288293
this NamespaceManager namespaceMgr,
289294
Action<QueueDescription> createQueueFilter = null)

src/ServiceStack.AzureServiceBus/AzureBusServer.cs

+42-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using ServiceStack.Logging;
33
using ServiceStack.Messaging;
44
using ServiceStack.Text;
5+
using ServiceStack;
56
using System;
67
using System.Collections.Generic;
78
using System.Linq;
@@ -84,6 +85,36 @@ public Action<string, BrokeredMessage, IMessage> PublishMessageFilter
8485
/// </summary>
8586
public Func<object, object> ResponseFilter { get; set; }
8687

88+
/// <summary>
89+
/// If you only want to enable priority queue handlers (and threads) for specific msg types
90+
/// </summary>
91+
public string[] PriorityQueuesWhitelist { get; set; }
92+
93+
/// <summary>
94+
/// Don't listen on any Priority Queues
95+
/// </summary>
96+
public bool DisablePriorityQueues
97+
{
98+
set
99+
{
100+
PriorityQueuesWhitelist = TypeConstants.EmptyStringArray;
101+
}
102+
}
103+
104+
/// <summary>
105+
/// Opt-in to only publish responses on this white list.
106+
/// Publishes all responses by default.
107+
/// </summary>
108+
public string[] PublishResponsesWhitelist { get; set; }
109+
110+
/// <summary>
111+
/// Don't publish any response messages
112+
/// </summary>
113+
public bool DisablePublishingResponses
114+
{
115+
set { PublishResponsesWhitelist = value ? TypeConstants.EmptyStringArray : null; }
116+
}
117+
87118
private int status;
88119

89120
public AzureBusServer(string connectionString): this(new AzureBusMessageFactory(connectionString))
@@ -157,6 +188,7 @@ protected IMessageHandlerFactory CreateMessageHandlerFactory<T>(Func<IMessage<T>
157188
RequestFilter = RequestFilter,
158189
ResponseFilter = ResponseFilter,
159190
RetryCount = RetryCount,
191+
PublishResponsesWhitelist = PublishResponsesWhitelist
160192
};
161193
}
162194

@@ -204,19 +236,27 @@ public virtual async Task Init()
204236
var queueNames = new QueueNames(msgType);
205237
var noOfThreads = handlerThreadCountMap[msgType];
206238

207-
msgPumpsBuilder.Add(new AzureMessageReceiverPump(
239+
var queuesToRegister = new List<string> { queueNames.In, queueNames.Out };
240+
241+
if (PriorityQueuesWhitelist == null
242+
|| PriorityQueuesWhitelist.Any(x => x == msgType.Name))
243+
{
244+
msgPumpsBuilder.Add(new AzureMessageReceiverPump(
208245
messageFactory,
209246
handlerFactory,
210247
queueNames.Priority,
211248
noOfThreads));
212249

250+
queuesToRegister.Add(queueNames.Priority);
251+
}
252+
213253
msgPumpsBuilder.Add(new AzureMessageReceiverPump(
214254
messageFactory,
215255
handlerFactory,
216256
queueNames.In,
217257
noOfThreads));
218258

219-
await messageFactory.NamespaceManager.RegisterQueuesAsync(queueNames, createQueueFilter).ConfigureAwait(false);
259+
await messageFactory.NamespaceManager.RegisterQueuesAsync(queuesToRegister, createQueueFilter).ConfigureAwait(false);
220260
}
221261

222262
messagePumps = msgPumpsBuilder.ToArray();

tests/ServiceStack.AzureServiceBus.Tests/AzureBusServerTests.cs

+164-19
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,25 @@ public class AlwaysThrows
2626
public string Value { get; set; }
2727
}
2828

29+
public class Hello : IReturn<HelloResponse>
30+
{
31+
public string Name { get; set; }
32+
}
33+
public class HelloNull1 : IReturn<HelloResponse>
34+
{
35+
public string Name { get; set; }
36+
}
37+
38+
public class HelloNull2 : IReturn<HelloResponse>
39+
{
40+
public string Name { get; set; }
41+
}
42+
43+
public class HelloResponse
44+
{
45+
public string Result { get; set; }
46+
}
47+
2948
[TestFixture, Category("Integration")]
3049
[NonParallelizable]
3150
public class AzureBusServerTests
@@ -151,25 +170,6 @@ public async Task Can_receive_and_process_same_reply_responses()
151170
}
152171
}
153172

154-
public class Hello : IReturn<HelloResponse>
155-
{
156-
public string Name { get; set; }
157-
}
158-
public class HelloNull1 : IReturn<HelloResponse>
159-
{
160-
public string Name { get; set; }
161-
}
162-
163-
public class HelloNull2 : IReturn<HelloResponse>
164-
{
165-
public string Name { get; set; }
166-
}
167-
168-
public class HelloResponse
169-
{
170-
public string Result { get; set; }
171-
}
172-
173173
[Test]
174174
public async Task Can_receive_and_process_standard_request_reply_combo()
175175
{
@@ -369,6 +369,7 @@ await mqServer.MessageFactory.PurgeQueuesAsync(
369369
});
370370

371371
var msg = mqClient.Get<HelloNull2>(replyMq, TimeSpan.FromSeconds(10));
372+
mqClient.Ack(msg);
372373

373374
await Task.Delay(100);
374375

@@ -439,5 +440,149 @@ await nsMgr.RegisterQueuesAsync<Hello>(desc =>
439440
}
440441
}
441442
}
443+
444+
[Test]
445+
public async Task Can_disable_priority_queues()
446+
{
447+
var msgsReceived = 0;
448+
449+
using (var mqServer = CreateMqServer())
450+
{
451+
await mqServer.MessageFactory.PurgeQueuesAsync(
452+
QueueNames<Hello>.In,
453+
QueueNames<HelloResponse>.In,
454+
QueueNames<Hello>.Priority);
455+
456+
mqServer.DisablePriorityQueues = true;
457+
458+
mqServer.RegisterHandler<Hello>(message =>
459+
{
460+
Interlocked.Increment(ref msgsReceived);
461+
return new HelloResponse() { Result = $"{message.GetBody().Name} world" };
462+
});
463+
464+
mqServer.Start();
465+
466+
using (var mqClient = mqServer.CreateMessageQueueClient())
467+
{
468+
mqClient.Publish(new Message<Hello>(new Hello { Name = "Hello" }));
469+
mqClient.Publish(new Message<Hello>(new Hello { Name = "Hello in priority" }) { Priority = 1 });
470+
471+
var msg = mqClient.Get<HelloResponse>(QueueNames<HelloResponse>.In, Config.ServerWaitTime);
472+
mqClient.Ack(msg);
473+
Assert.That(msg.GetBody().Result, Is.EqualTo("Hello world"));
474+
}
475+
476+
Assert.That(msgsReceived, Is.EqualTo(1));
477+
}
478+
}
479+
480+
[Test]
481+
public async Task Can_whitelist_priority_queue_by_message_type()
482+
{
483+
var msgsReceived = 0;
484+
485+
using (var mqServer = CreateMqServer())
486+
{
487+
await mqServer.MessageFactory.PurgeQueuesAsync(
488+
QueueNames<Hello>.In,
489+
QueueNames<HelloResponse>.In,
490+
QueueNames<Hello>.Priority);
491+
492+
mqServer.PriorityQueuesWhitelist = new[] { nameof(Hello) };
493+
494+
mqServer.RegisterHandler<Hello>(message =>
495+
{
496+
Interlocked.Increment(ref msgsReceived);
497+
return new HelloResponse() { Result = $"{message.GetBody().Name} world" };
498+
});
499+
500+
mqServer.Start();
501+
502+
using (var mqClient = mqServer.CreateMessageQueueClient())
503+
{
504+
mqClient.Publish(new Message<Hello>(new Hello { Name = "Hello" }));
505+
mqClient.Publish(new Message<Hello>(new Hello { Name = "Hello in priority" }) { Priority = 1 });
506+
507+
var msg = mqClient.Get<HelloResponse>(QueueNames<HelloResponse>.In, Config.ServerWaitTime);
508+
mqClient.Ack(msg);
509+
msg = mqClient.Get<HelloResponse>(QueueNames<HelloResponse>.In, Config.ServerWaitTime);
510+
mqClient.Ack(msg);
511+
}
512+
513+
Assert.That(msgsReceived, Is.EqualTo(2));
514+
}
515+
}
516+
517+
[Test]
518+
public async Task Can_disable_publishing_responses()
519+
{
520+
var msgsReceived = 0;
521+
522+
using (var mqServer = CreateMqServer())
523+
{
524+
await mqServer.MessageFactory.PurgeQueuesAsync(
525+
QueueNames<Hello>.In,
526+
QueueNames<HelloResponse>.In,
527+
QueueNames<Hello>.Priority);
528+
529+
mqServer.DisablePublishingResponses = true;
530+
531+
mqServer.RegisterHandler<Hello>(message =>
532+
{
533+
Interlocked.Increment(ref msgsReceived);
534+
return new HelloResponse() { Result = $"{message.GetBody().Name} world" };
535+
});
536+
537+
mqServer.Start();
538+
539+
using (var mqClient = mqServer.CreateMessageQueueClient())
540+
{
541+
mqClient.Publish(new Hello { Name = "Hello" });
542+
543+
var msg = mqClient.Get<HelloResponse>(QueueNames<HelloResponse>.In, TimeSpan.FromSeconds(1));
544+
Assert.Null(msg);
545+
}
546+
547+
Assert.That(msgsReceived, Is.EqualTo(1));
548+
}
549+
}
550+
551+
[Test]
552+
public async Task Can_whitelist_publishing_responses_by_message_type()
553+
{
554+
var msgsReceived = 0;
555+
556+
using (var mqServer = CreateMqServer())
557+
{
558+
await mqServer.MessageFactory.PurgeQueuesAsync(
559+
QueueNames<Hello>.In,
560+
QueueNames<HelloResponse>.In,
561+
QueueNames<Hello>.Priority);
562+
563+
mqServer.PublishResponsesWhitelist = new[] { nameof(HelloResponse) };
564+
565+
mqServer.RegisterHandler<Hello>(message =>
566+
{
567+
Interlocked.Increment(ref msgsReceived);
568+
return new HelloResponse() { Result = $"{message.GetBody().Name} world" };
569+
});
570+
571+
mqServer.Start();
572+
573+
using (var mqClient = mqServer.CreateMessageQueueClient())
574+
{
575+
mqClient.Publish(new Hello { Name = "Hello" });
576+
577+
var msg = mqClient.Get<HelloResponse>(QueueNames<HelloResponse>.In, Config.ServerWaitTime);
578+
if (msg != null)
579+
{
580+
mqClient.Ack(msg);
581+
}
582+
}
583+
584+
Assert.That(msgsReceived, Is.EqualTo(1));
585+
}
586+
}
442587
}
443588
}

0 commit comments

Comments
 (0)