8
8
using global ::RabbitMQ . Client ;
9
9
using global ::RabbitMQ . Client . Events ;
10
10
using Microsoft . Extensions . ObjectPool ;
11
+ using Microsoft . Extensions . Options ;
11
12
12
13
/// <summary>
13
14
/// Default RabbitMQ Bus.
@@ -52,10 +53,10 @@ public class DefaultRabbitMQBus : IEasyCachingBus
52
53
/// <param name="serializer">Serializer.</param>
53
54
public DefaultRabbitMQBus (
54
55
IPooledObjectPolicy < IConnection > _objectPolicy
55
- , RabbitMQBusOptions rabbitMQOptions
56
- , IEasyCachingSerializer serializer )
56
+ , IOptions < RabbitMQBusOptions > rabbitMQOptions
57
+ , IEasyCachingSerializer serializer )
57
58
{
58
- this . _options = rabbitMQOptions ;
59
+ this . _options = rabbitMQOptions . Value ;
59
60
this . _serializer = serializer ;
60
61
61
62
var factory = new ConnectionFactory
@@ -132,7 +133,7 @@ public void Publish(string topic, EasyCachingMessage message)
132
133
}
133
134
return Task . CompletedTask ;
134
135
}
135
-
136
+
136
137
/// <summary>
137
138
/// Subscribe the specified topic and action.
138
139
/// </summary>
@@ -142,7 +143,7 @@ public void Subscribe(string topic, Action<EasyCachingMessage> action)
142
143
{
143
144
_handler = action ;
144
145
var queueName = string . Empty ;
145
- if ( string . IsNullOrWhiteSpace ( _options . QueueName ) )
146
+ if ( string . IsNullOrWhiteSpace ( _options . QueueName ) )
146
147
{
147
148
queueName = $ "rmq.queue.undurable.easycaching.subscriber.{ _busId } ";
148
149
}
@@ -151,13 +152,13 @@ public void Subscribe(string topic, Action<EasyCachingMessage> action)
151
152
queueName = _options . QueueName ;
152
153
}
153
154
154
- Task . Factory . StartNew ( ( ) =>
155
+ Task . Factory . StartNew ( ( ) =>
155
156
{
156
157
var model = _subConnection . CreateModel ( ) ;
157
158
model . ExchangeDeclare ( _options . TopicExchangeName , ExchangeType . Topic , true , false , null ) ;
158
159
model . QueueDeclare ( queueName , false , false , true , null ) ;
159
160
// bind the queue with the exchange.
160
- model . QueueBind ( _options . TopicExchangeName , queueName , _options . RouteKey ) ;
161
+ model . QueueBind ( queueName , _options . TopicExchangeName , topic ) ;
161
162
var consumer = new EventingBasicConsumer ( model ) ;
162
163
consumer . Received += OnMessage ;
163
164
consumer . Shutdown += OnConsumerShutdown ;
0 commit comments