Skip to content

Commit

Permalink
Allowing Multiple Subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
jmarbutt committed Jun 25, 2020
1 parent 0246a61 commit 7f933af
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 38 deletions.
42 changes: 20 additions & 22 deletions HangFire.TopicExtensions/Attributes/SubscriberJobAttribute.cs
Original file line number Diff line number Diff line change
@@ -1,36 +1,34 @@
using System;
using System.Collections.Generic;
using System.Text;
using Hangfire.States;

namespace HangFire.TopicExtensions.Attributes
{
[AttributeUsage(AttributeTargets.Class)]
[AttributeUsage(AttributeTargets.Class, AllowMultiple = true)]
public class SubscriberJobAttribute : Attribute
{
public SubscriberJobAttribute(string topic) : this(topic, EnqueuedState.DefaultQueue)
{
}

public string TopicJobId { get; set; }

public string Queue { get; set; }


public bool Enabled { get; set; } = true;

public SubscriberJobAttribute(string topic) : this(topic, EnqueuedState.DefaultQueue) { }
public SubscriberJobAttribute(string topic, string queue)
{
if (string.IsNullOrEmpty(topic)) throw new ArgumentNullException(nameof(topic));



public SubscriberJobAttribute(string topic, string queue)
{
if (string.IsNullOrEmpty(topic)) throw new ArgumentNullException(nameof(topic));

if (string.IsNullOrEmpty(queue)) throw new ArgumentNullException(nameof(queue));
if (string.IsNullOrEmpty(queue)) throw new ArgumentNullException(nameof(queue));


Topic = topic;
Queue = queue;
}

Topic = topic;
Queue = queue;
}

public string TopicJobId { get; set; }

public string Queue { get; set; }


public bool Enabled { get; set; } = true;

public string Topic { get; set; }
}
}
}
49 changes: 33 additions & 16 deletions HangFire.TopicExtensions/TopicPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,45 +7,58 @@
using HangFire.TopicExtensions.Interfaces;
using Microsoft.Extensions.DependencyInjection;


namespace HangFire.TopicExtensions
{
public class TopicPublisher : ITopicPublisher
{

private readonly IServiceProvider _serviceProvider;

public TopicPublisher(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
BuildSubscribers();
}

public List<Subscription> Subscriptions { get; set; }

public void EnqueueTopic(string topic, object context = null)
{

// Find Subscribers
BackgroundJob.Enqueue(() => DispatchTopic(topic, context));

}

public void DispatchTopic(string topic, object context)
private void BuildSubscribers()
{
var allTypes = GetImplementationsOf<ISubscriber>();
Subscriptions = new List<Subscription>();

var allSubscribers = GetImplementationsOf<ISubscriber>();

foreach (var type in allSubscribers)
foreach (var type in allTypes)
{

var attributes = type.GetCustomAttributes<SubscriberJobAttribute>();

var subscribed = attributes.Any(a=> a.Topic.ToLower() == topic);
if (!subscribed) continue;
var impl = (ISubscriber)ActivatorUtilities.CreateInstance(_serviceProvider,type);
BackgroundJob.Enqueue(() => impl.Execute(context));
var subscriberJobAttributes = attributes.ToList();
if (!subscriberJobAttributes.Any()) continue;


var impl = (ISubscriber) ActivatorUtilities.CreateInstance(_serviceProvider, type);

foreach (var subscriberJobAttribute in subscriberJobAttributes)
{
var subscription = new Subscription
{
Topic = subscriberJobAttribute.Topic,
Subscriber = impl
};

Subscriptions.Add(subscription);
}
}
}

public void DispatchTopic(string topic, object context)
{
foreach (var subscription in Subscriptions.Where(s => s.Topic == topic))
BackgroundJob.Enqueue(() => subscription.Subscriber.Execute(context));
}

private static IEnumerable<Type> GetImplementationsOf<TInterface>()
Expand All @@ -56,7 +69,11 @@ private static IEnumerable<Type> GetImplementationsOf<TInterface>()
assembly.GetTypes().Where(type => !type.IsInterface && interfaceType.IsAssignableFrom(type)))
.SelectMany(implementation => implementation);
}
}


public class Subscription
{
public string Topic { get; set; }
public ISubscriber Subscriber { get; set; }
}
}
}

0 comments on commit 7f933af

Please sign in to comment.