From ebed19ae41391ec18b671253d2eb955183566720 Mon Sep 17 00:00:00 2001 From: Olivier Coanet Date: Wed, 8 Jan 2020 09:54:14 +0100 Subject: [PATCH] Support IEventProcessorSequenceAware in ValueBatchEventProcessor --- .../SequenceReportingCallbackTests.cs | 67 +++++++++++++------ src/Disruptor/BatchEventProcessor.cs | 17 +++-- src/Disruptor/IBatchStartAware.cs | 4 +- src/Disruptor/IEventProcessorSequenceAware.cs | 21 ++++++ src/Disruptor/IEventReleaseAware.cs | 5 +- src/Disruptor/ILifecycleAware.cs | 5 +- ...r.cs => ISequenceReportingEventHandler.cs} | 9 +-- src/Disruptor/ValueBatchEventProcessor.cs | 8 +-- 8 files changed, 90 insertions(+), 46 deletions(-) create mode 100644 src/Disruptor/IEventProcessorSequenceAware.cs rename src/Disruptor/{SequenceReportingEventHandler.cs => ISequenceReportingEventHandler.cs} (70%) diff --git a/src/Disruptor.Tests/SequenceReportingCallbackTests.cs b/src/Disruptor.Tests/SequenceReportingCallbackTests.cs index 66125e6c..40098942 100644 --- a/src/Disruptor.Tests/SequenceReportingCallbackTests.cs +++ b/src/Disruptor.Tests/SequenceReportingCallbackTests.cs @@ -1,4 +1,6 @@ +using System; using System.Threading; +using System.Threading.Tasks; using Disruptor.Tests.Support; using NUnit.Framework; @@ -7,59 +9,84 @@ namespace Disruptor.Tests [TestFixture] public class SequenceReportingCallbackTests { - private readonly ManualResetEvent _callbackSignal = new ManualResetEvent(false); - private readonly ManualResetEvent _onEndOfBatchSignal = new ManualResetEvent(false); - [Test] - public void ShouldReportProgressByUpdatingSequenceViaCallback() + public void ShouldReportEventHandlerProgressByUpdatingSequenceViaCallback() { var ringBuffer = RingBuffer.CreateMultiProducer(() => new StubEvent(-1), 16); var sequenceBarrier = ringBuffer.NewBarrier(); - ISequenceReportingEventHandler handler = new TestSequenceReportingEventHandler(_callbackSignal, _onEndOfBatchSignal); + var handler = new TestSequenceReportingEventHandler(); + var batchEventProcessor = BatchEventProcessorFactory.Create(ringBuffer, sequenceBarrier, handler); + ringBuffer.AddGatingSequences(batchEventProcessor.Sequence); + + var task = Task.Run(batchEventProcessor.Run); + + Assert.AreEqual(-1L, batchEventProcessor.Sequence.Value); + ringBuffer.Publish(ringBuffer.Next()); + + handler.CallbackSignal.WaitOne(); + Assert.AreEqual(0L, batchEventProcessor.Sequence.Value); + + handler.OnEndOfBatchSignal.Set(); + Assert.AreEqual(0L, batchEventProcessor.Sequence.Value); + + batchEventProcessor.Halt(); + Assert.IsTrue(task.Wait(TimeSpan.FromSeconds(10))); + } + + [Test] + public void ShouldReportValueEventHandlerProgressByUpdatingSequenceViaCallback() + { + var ringBuffer = ValueRingBuffer.CreateMultiProducer(() => new StubValueEvent(-1), 16); + var sequenceBarrier = ringBuffer.NewBarrier(); + var handler = new TestSequenceReportingEventHandler(); var batchEventProcessor = BatchEventProcessorFactory.Create(ringBuffer, sequenceBarrier, handler); ringBuffer.AddGatingSequences(batchEventProcessor.Sequence); - var thread = new Thread(batchEventProcessor.Run) {IsBackground = true}; - thread.Start(); + var task = Task.Run(batchEventProcessor.Run); Assert.AreEqual(-1L, batchEventProcessor.Sequence.Value); ringBuffer.Publish(ringBuffer.Next()); - _callbackSignal.WaitOne(); + handler.CallbackSignal.WaitOne(); Assert.AreEqual(0L, batchEventProcessor.Sequence.Value); - _onEndOfBatchSignal.Set(); + handler.OnEndOfBatchSignal.Set(); Assert.AreEqual(0L, batchEventProcessor.Sequence.Value); batchEventProcessor.Halt(); - thread.Join(); + Assert.IsTrue(task.Wait(TimeSpan.FromSeconds(10))); } - private class TestSequenceReportingEventHandler : ISequenceReportingEventHandler + private class TestSequenceReportingEventHandler : IEventHandler, IValueEventHandler, IEventProcessorSequenceAware { private ISequence _sequenceCallback; - private readonly ManualResetEvent _callbackSignal; - private readonly ManualResetEvent _onEndOfBatchSignal; - public TestSequenceReportingEventHandler(ManualResetEvent callbackSignal, ManualResetEvent onEndOfBatchSignal) - { - _callbackSignal = callbackSignal; - _onEndOfBatchSignal = onEndOfBatchSignal; - } + public ManualResetEvent CallbackSignal { get; } = new ManualResetEvent(false); + public ManualResetEvent OnEndOfBatchSignal { get; } = new ManualResetEvent(false); public void SetSequenceCallback(ISequence sequenceTrackerCallback) { _sequenceCallback = sequenceTrackerCallback; } + public void OnEvent(ref StubValueEvent data, long sequence, bool endOfBatch) + { + OnEvent(sequence, endOfBatch); + } + public void OnEvent(StubEvent evt, long sequence, bool endOfBatch) + { + OnEvent(sequence, endOfBatch); + } + + private void OnEvent(long sequence, bool endOfBatch) { _sequenceCallback.SetValue(sequence); - _callbackSignal.Set(); + CallbackSignal.Set(); if (endOfBatch) { - _onEndOfBatchSignal.WaitOne(); + OnEndOfBatchSignal.WaitOne(); } } } diff --git a/src/Disruptor/BatchEventProcessor.cs b/src/Disruptor/BatchEventProcessor.cs index 959c014f..9aedb003 100644 --- a/src/Disruptor/BatchEventProcessor.cs +++ b/src/Disruptor/BatchEventProcessor.cs @@ -6,12 +6,12 @@ namespace Disruptor /// /// Convenience class for handling the batching semantics of consuming events from a /// and delegating the available events to an . - /// + /// /// If the also implements it will be notified just after the thread /// is started and just before the thread is shutdown. - /// + /// /// This class is kept mainly for compatibility reasons. - /// + /// /// Consider using to create your . /// /// the type of event used. @@ -21,7 +21,7 @@ public class BatchEventProcessor : BatchEventProcessor, I /// /// Construct a BatchEventProcessor that will automatically track the progress by updating its sequence when /// the method returns. - /// + /// /// Consider using to create your . /// /// dataProvider to which events are published @@ -52,7 +52,7 @@ public void OnBatchStart(long batchSize) /// /// Convenience class for handling the batching semantics of consuming events from a /// and delegating the available events to an . - /// + /// /// If the also implements it will be notified just after the thread /// is started and just before the thread is shutdown. /// @@ -63,7 +63,6 @@ public void OnBatchStart(long batchSize) /// the type of the used. public class BatchEventProcessor : IBatchEventProcessor where T : class - where TDataProvider : IDataProvider where TSequenceBarrier : ISequenceBarrier where TEventHandler : IEventHandler @@ -92,7 +91,7 @@ private static class RunningStates /// /// Construct a BatchEventProcessor that will automatically track the progress by updating its sequence when /// the method returns. - /// + /// /// Consider using to create your . /// /// dataProvider to which events are published @@ -106,8 +105,8 @@ public BatchEventProcessor(TDataProvider dataProvider, TSequenceBarrier sequence _eventHandler = eventHandler; _batchStartAware = batchStartAware; - if (eventHandler is ISequenceReportingEventHandler sequenceReportingEventHandler) - sequenceReportingEventHandler.SetSequenceCallback(_sequence); + if (eventHandler is IEventProcessorSequenceAware sequenceAware) + sequenceAware.SetSequenceCallback(_sequence); _timeoutHandler = eventHandler as ITimeoutHandler; } diff --git a/src/Disruptor/IBatchStartAware.cs b/src/Disruptor/IBatchStartAware.cs index 4ed7e3a6..89d2606b 100644 --- a/src/Disruptor/IBatchStartAware.cs +++ b/src/Disruptor/IBatchStartAware.cs @@ -1,12 +1,12 @@ namespace Disruptor { /// - /// Implement this interface in your to be notified when a batch is starting. + /// Implement this interface in your event handler to be notified when a batch is starting. /// public interface IBatchStartAware { /// - /// Called on each batch start before the first call to . + /// Called on each batch start before the first call to or . /// /// the batch size. void OnBatchStart(long batchSize); diff --git a/src/Disruptor/IEventProcessorSequenceAware.cs b/src/Disruptor/IEventProcessorSequenceAware.cs new file mode 100644 index 00000000..2d787a30 --- /dev/null +++ b/src/Disruptor/IEventProcessorSequenceAware.cs @@ -0,0 +1,21 @@ +namespace Disruptor +{ + /// + /// Implement this interface in your event handler to obtain the sequence. + /// + /// Used by the to set a callback allowing the event handler to notify + /// when it has finished consuming an event if this happens after the OnEvent call. + /// + /// Typically this would be used when the handler is performing some sort of batching operation such as writing to an IO + /// device; after the operation has completed, the implementation should set to update the + /// sequence and allow other processes that are dependent on this handler to progress. + /// + public interface IEventProcessorSequenceAware + { + /// + /// Call by the to setup the callback. + /// + /// callback on which to notify the that the sequence has progressed. + void SetSequenceCallback(ISequence sequenceCallback); + } +} diff --git a/src/Disruptor/IEventReleaseAware.cs b/src/Disruptor/IEventReleaseAware.cs index de9a11d9..e0cccc32 100644 --- a/src/Disruptor/IEventReleaseAware.cs +++ b/src/Disruptor/IEventReleaseAware.cs @@ -1,7 +1,10 @@ namespace Disruptor { + /// + /// Implement this interface in your to obtain the . + /// public interface IEventReleaseAware { void SetEventReleaser(IEventReleaser eventReleaser); } -} \ No newline at end of file +} diff --git a/src/Disruptor/ILifecycleAware.cs b/src/Disruptor/ILifecycleAware.cs index 1c43d343..08f94b9a 100644 --- a/src/Disruptor/ILifecycleAware.cs +++ b/src/Disruptor/ILifecycleAware.cs @@ -1,8 +1,7 @@ namespace Disruptor { /// - /// Implement this interface in your to be notified when a thread for the - /// starts and shuts down. + /// Implement this interface in your event handler to be notified when the processing thread starts and shuts down. /// public interface ILifecycleAware { @@ -13,7 +12,7 @@ public interface ILifecycleAware /// /// Called once just before the thread is shutdown. - /// + /// /// Sequence event processing will already have stopped before this method is called. No events will /// be processed after this message. /// diff --git a/src/Disruptor/SequenceReportingEventHandler.cs b/src/Disruptor/ISequenceReportingEventHandler.cs similarity index 70% rename from src/Disruptor/SequenceReportingEventHandler.cs rename to src/Disruptor/ISequenceReportingEventHandler.cs index ac525903..10ffd664 100644 --- a/src/Disruptor/SequenceReportingEventHandler.cs +++ b/src/Disruptor/ISequenceReportingEventHandler.cs @@ -3,18 +3,13 @@ namespace Disruptor /// /// Used by the to set a callback allowing the to notify /// when it has finished consuming an event if this happens after the call. - /// + /// /// Typically this would be used when the handler is performing some sort of batching operation such as writing to an IO /// device; after the operation has completed, the implementation should set to update the /// sequence and allow other processes that are dependent on this handler to progress. /// /// event implementation storing the data for sharing during exchange or parallel coordination of an event. - public interface ISequenceReportingEventHandler : IEventHandler + public interface ISequenceReportingEventHandler : IEventHandler, IEventProcessorSequenceAware { - /// - /// Call by the to setup the callback. - /// - /// callback on which to notify the that the sequence has progressed. - void SetSequenceCallback(ISequence sequenceCallback); } } diff --git a/src/Disruptor/ValueBatchEventProcessor.cs b/src/Disruptor/ValueBatchEventProcessor.cs index 403acc83..eb4aff3e 100644 --- a/src/Disruptor/ValueBatchEventProcessor.cs +++ b/src/Disruptor/ValueBatchEventProcessor.cs @@ -6,7 +6,7 @@ namespace Disruptor /// /// Convenience class for handling the batching semantics of consuming events from a /// and delegating the available events to an . - /// + /// /// If the also implements it will be notified just after the thread /// is started and just before the thread is shutdown. /// @@ -46,7 +46,7 @@ private static class RunningStates /// /// Construct a BatchEventProcessor that will automatically track the progress by updating its sequence when /// the method returns. - /// + /// /// Consider using to create your . /// /// dataProvider to which events are published @@ -60,8 +60,8 @@ public ValueBatchEventProcessor(TDataProvider dataProvider, TSequenceBarrier seq _eventHandler = eventHandler; _batchStartAware = batchStartAware; - if (eventHandler is ISequenceReportingEventHandler sequenceReportingEventHandler) - sequenceReportingEventHandler.SetSequenceCallback(_sequence); + if (eventHandler is IEventProcessorSequenceAware sequenceAware) + sequenceAware.SetSequenceCallback(_sequence); _timeoutHandler = eventHandler as ITimeoutHandler; }