Skip to content

Commit

Permalink
feat: add event notification feature
Browse files Browse the repository at this point in the history
  • Loading branch information
ailtonguitar committed Oct 2, 2023
1 parent 0970b4c commit 4429cea
Show file tree
Hide file tree
Showing 17 changed files with 373 additions and 186 deletions.
6 changes: 2 additions & 4 deletions src/KafkaFlow.Abstractions/Consumers/IWorker.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
namespace KafkaFlow

Check warning on line 1 in src/KafkaFlow.Abstractions/Consumers/IWorker.cs

View workflow job for this annotation

GitHub Actions / build

A property should not follow a method [/home/runner/work/kafkaflow/kafkaflow/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj]
{
using System;
using KafkaFlow.Configuration;
using KafkaFlow.Observer;

/// <summary>
/// Represents the interface of a internal worker
Expand All @@ -23,11 +21,11 @@ public interface IWorker
/// <summary>
/// Gets the subject for worker stopping events where observers can subscribe to receive notifications.
/// </summary>
ISubject<WorkerStoppingSubject, VoidObject> WorkerStopping { get; }
IEvent WorkerStopping { get; }

Check warning on line 24 in src/KafkaFlow.Abstractions/Consumers/IWorker.cs

View workflow job for this annotation

GitHub Actions / Test deployment

A property should not follow a method

/// <summary>
/// Gets the subject for worker stopped events where observers can subscribe to receive notifications.
/// </summary>
ISubject<WorkerStoppedSubject, VoidObject> WorkerStopped { get; }
IEvent WorkerStopped { get; }
}
}
19 changes: 0 additions & 19 deletions src/KafkaFlow.Abstractions/Consumers/WorkerStoppedSubject.cs

This file was deleted.

19 changes: 0 additions & 19 deletions src/KafkaFlow.Abstractions/Consumers/WorkerStoppingSubject.cs

This file was deleted.

42 changes: 42 additions & 0 deletions src/KafkaFlow.Abstractions/IEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
namespace KafkaFlow
{
using System;
using System.Threading.Tasks;

/// <summary>
/// Represents an Event to be subscribed.
/// </summary>
public interface IEvent
{
/// <summary>
/// Subscribes to the event.
/// </summary>
/// <param name="handler">The handler to be called when the event is fired.</param>
void Subscribe(Func<Task> handler);

/// <summary>
/// Unsubscribes to the event.
/// </summary>
/// <param name="handler">The handler to be called when the event is fired.</param>
void Unsubscribe(Func<Task> handler);
}

/// <summary>
/// Represents an Event to be subscribed.
/// </summary>
/// <typeparam name="TArg">The argument expected by the event.</typeparam>
public interface IEvent<TArg>
{
/// <summary>
/// Subscribes to the event.
/// </summary>
/// <param name="handler">The handler to be called when the event is fired.</param>
void Subscribe(Func<TArg, Task> handler);

/// <summary>
/// Unsubscribes to the event.
/// </summary>
/// <param name="handler">The handler to be called when the event is fired.</param>
void Unsubscribe(Func<TArg, Task> handler);
}
}
17 changes: 0 additions & 17 deletions src/KafkaFlow.Abstractions/Observer/ISubject.cs

This file was deleted.

18 changes: 0 additions & 18 deletions src/KafkaFlow.Abstractions/Observer/ISubjectObserver.cs

This file was deleted.

53 changes: 0 additions & 53 deletions src/KafkaFlow.Abstractions/Observer/Subject.cs

This file was deleted.

17 changes: 0 additions & 17 deletions src/KafkaFlow.Abstractions/VoidObject.cs

This file was deleted.

10 changes: 2 additions & 8 deletions src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,8 @@
using System.Threading.Tasks;
using KafkaFlow.Configuration;
using KafkaFlow.Consumers;
using KafkaFlow.Observer;

internal class BatchConsumeMiddleware
: IMessageMiddleware,
ISubjectObserver<WorkerStoppedSubject, VoidObject>,
IDisposable
internal class BatchConsumeMiddleware : IMessageMiddleware, IDisposable
{
private readonly SemaphoreSlim dispatchSemaphore = new(1, 1);

Expand All @@ -37,7 +33,7 @@ public BatchConsumeMiddleware(
this.batch = new(batchSize);
this.consumerConfiguration = middlewareContext.Consumer.Configuration;

middlewareContext.Worker.WorkerStopped.Subscribe(this);
middlewareContext.Worker.WorkerStopped.Subscribe(() => this.TriggerDispatchAndWaitAsync());
}

public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
Expand Down Expand Up @@ -67,8 +63,6 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
}
}

public async Task OnNotification(WorkerStoppedSubject subject, VoidObject arg) => await this.TriggerDispatchAndWaitAsync();

public void Dispose()
{
this.dispatchTask?.Dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void Setup()

workerMock
.SetupGet(x => x.WorkerStopped)
.Returns(new WorkerStoppedSubject(this.logHandlerMock.Object));
.Returns(new Event(this.logHandlerMock.Object));

consumerConfigurationMock
.SetupGet(x => x.AutoMessageCompletion)
Expand Down
Loading

0 comments on commit 4429cea

Please sign in to comment.