diff --git a/HangFire.TopicExtensions/Attributes/SubscriberJobAttribute.cs b/HangFire.TopicExtensions/Attributes/SubscriberJobAttribute.cs index b9042dc..525c36f 100644 --- a/HangFire.TopicExtensions/Attributes/SubscriberJobAttribute.cs +++ b/HangFire.TopicExtensions/Attributes/SubscriberJobAttribute.cs @@ -1,36 +1,34 @@ using System; -using System.Collections.Generic; -using System.Text; using Hangfire.States; namespace HangFire.TopicExtensions.Attributes { - [AttributeUsage(AttributeTargets.Class)] + [AttributeUsage(AttributeTargets.Class, AllowMultiple = true)] public class SubscriberJobAttribute : Attribute { + public SubscriberJobAttribute(string topic) : this(topic, EnqueuedState.DefaultQueue) + { + } - public string TopicJobId { get; set; } - - public string Queue { get; set; } - - public bool Enabled { get; set; } = true; - - public SubscriberJobAttribute(string topic) : this(topic, EnqueuedState.DefaultQueue) { } + public SubscriberJobAttribute(string topic, string queue) + { + if (string.IsNullOrEmpty(topic)) throw new ArgumentNullException(nameof(topic)); - - - public SubscriberJobAttribute(string topic, string queue) - { - if (string.IsNullOrEmpty(topic)) throw new ArgumentNullException(nameof(topic)); - - if (string.IsNullOrEmpty(queue)) throw new ArgumentNullException(nameof(queue)); + if (string.IsNullOrEmpty(queue)) throw new ArgumentNullException(nameof(queue)); - - Topic = topic; - Queue = queue; - } + + Topic = topic; + Queue = queue; + } + + public string TopicJobId { get; set; } + + public string Queue { get; set; } + + + public bool Enabled { get; set; } = true; public string Topic { get; set; } } -} +} \ No newline at end of file diff --git a/HangFire.TopicExtensions/TopicPublisher.cs b/HangFire.TopicExtensions/TopicPublisher.cs index 4cfb01f..4975340 100644 --- a/HangFire.TopicExtensions/TopicPublisher.cs +++ b/HangFire.TopicExtensions/TopicPublisher.cs @@ -7,45 +7,58 @@ using HangFire.TopicExtensions.Interfaces; using Microsoft.Extensions.DependencyInjection; - namespace HangFire.TopicExtensions { public class TopicPublisher : ITopicPublisher { - private readonly IServiceProvider _serviceProvider; public TopicPublisher(IServiceProvider serviceProvider) { _serviceProvider = serviceProvider; + BuildSubscribers(); } + public List Subscriptions { get; set; } + public void EnqueueTopic(string topic, object context = null) { - // Find Subscribers BackgroundJob.Enqueue(() => DispatchTopic(topic, context)); - } - public void DispatchTopic(string topic, object context) + private void BuildSubscribers() { + var allTypes = GetImplementationsOf(); + Subscriptions = new List(); - var allSubscribers = GetImplementationsOf(); - - foreach (var type in allSubscribers) + foreach (var type in allTypes) { - var attributes = type.GetCustomAttributes(); - var subscribed = attributes.Any(a=> a.Topic.ToLower() == topic); - if (!subscribed) continue; - - var impl = (ISubscriber)ActivatorUtilities.CreateInstance(_serviceProvider,type); - BackgroundJob.Enqueue(() => impl.Execute(context)); + var subscriberJobAttributes = attributes.ToList(); + if (!subscriberJobAttributes.Any()) continue; + + + var impl = (ISubscriber) ActivatorUtilities.CreateInstance(_serviceProvider, type); + foreach (var subscriberJobAttribute in subscriberJobAttributes) + { + var subscription = new Subscription + { + Topic = subscriberJobAttribute.Topic, + Subscriber = impl + }; + + Subscriptions.Add(subscription); + } } + } + public void DispatchTopic(string topic, object context) + { + foreach (var subscription in Subscriptions.Where(s => s.Topic == topic)) + BackgroundJob.Enqueue(() => subscription.Subscriber.Execute(context)); } private static IEnumerable GetImplementationsOf() @@ -56,7 +69,11 @@ private static IEnumerable GetImplementationsOf() assembly.GetTypes().Where(type => !type.IsInterface && interfaceType.IsAssignableFrom(type))) .SelectMany(implementation => implementation); } - } - + public class Subscription + { + public string Topic { get; set; } + public ISubscriber Subscriber { get; set; } + } + } } \ No newline at end of file