Skip to content

Commit

Permalink
Merge pull request #752 from dolittle/startfrom
Browse files Browse the repository at this point in the history
EventHandler StartFrom / StopAt support
  • Loading branch information
mhelleborg authored Sep 15, 2023
2 parents 7643a68 + aec8420 commit 98642e9
Show file tree
Hide file tree
Showing 33 changed files with 608 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
Dolittle.Runtime.Events.Processing.Contracts.EventHandlerRegistrationResponse,
Dolittle.Runtime.Events.Processing.Contracts.HandleEventRequest,
Dolittle.Runtime.Events.Processing.Contracts.EventHandlerResponse>;
using StartFrom = Dolittle.Runtime.Events.Processing.EventHandlers.StartFrom;

namespace Integration.Benchmarks.Events.Processing.EventHandlers;

Expand Down Expand Up @@ -86,7 +87,7 @@ public void IterationSetup()

var eventHandlers = new List<IEventHandler>();
eventHandlers.AddRange(Enumerable.Range(0, EventHandlers).Select(_ => _eventHandlerFactory.Create(
new EventHandlerRegistrationArguments(Runtime.CreateExecutionContextFor("d9fd643f-ce74-4ae5-b706-b76859fd8827"), Guid.NewGuid(), _eventTypes, Partitioned, ScopeId.Default, Concurrency),
new EventHandlerRegistrationArguments(Runtime.CreateExecutionContextFor("d9fd643f-ce74-4ae5-b706-b76859fd8827"), Guid.NewGuid(), _eventTypes, Partitioned, ScopeId.Default, startFrom: StartFrom.Default, null, Concurrency),
_dispatcher.Object,
CancellationToken.None)));
_eventHandlersToRun = eventHandlers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using Dolittle.Runtime.Events.Store.Streams;
using Dolittle.Runtime.Events.Store.Streams.Filters;
using Dolittle.Runtime.Rudimentary;
using FluentAssertions;
using Google.Protobuf.WellKnownTypes;
using Integration.Shared;
using Machine.Specifications;
Expand All @@ -37,6 +38,7 @@
using UncommittedEvent = Dolittle.Runtime.Events.Store.UncommittedEvent;
using MongoStreamEvent = Dolittle.Runtime.Events.Store.MongoDB.Events.StreamEvent;
using EventHorizonConsumerProcessor = Dolittle.Runtime.EventHorizon.Consumer.Processing.EventProcessor;
using StartFrom = Dolittle.Runtime.Events.Processing.EventHandlers.StartFrom;
using StreamEvent = Dolittle.Runtime.Events.Store.Streams.StreamEvent;

namespace Integration.Tests.Events.Processing.EventHandlers.given;
Expand Down Expand Up @@ -114,7 +116,7 @@ protected static IEnumerable<StreamEvent> get_partitioned_events_in_stream(IEven
using var cts = new CancellationTokenSource(100);

var reader = stream_event_subscriber.Subscribe(event_handler.Info.Id.Scope, event_handler.Info.EventTypes.ToList(), ProcessingPosition.Initial,
event_handler.Info.Partitioned, $"get_partitioned_events_in_stream:{event_handler.Info.Id.EventHandler.Value}", cts.Token);
event_handler.Info.Partitioned, $"get_partitioned_events_in_stream:{event_handler.Info.Id.EventHandler.Value}", null, cts.Token);

var events = new List<StreamEvent>();

Expand Down Expand Up @@ -217,7 +219,7 @@ protected static void with_event_handlers_filtering_number_of_event_types(
{
var (partitioned, max_event_types_to_filter, scope, fast, implicitFilter) = _;
var registration_arguments = new EventHandlerRegistrationArguments(
Runtime.CreateExecutionContextFor(tenant), Guid.NewGuid(), event_types.Take(max_event_types_to_filter), partitioned, scope, concurrency);
Runtime.CreateExecutionContextFor(tenant), Guid.NewGuid(), event_types.Take(max_event_types_to_filter), partitioned, scope, startFrom: StartFrom.Earliest, null, concurrency);
return event_handler_factory.Create(registration_arguments, dispatcher.Object, CancellationToken.None);
}).ToArray();
}
Expand Down Expand Up @@ -403,11 +405,11 @@ static void expect_event_processor_stream_processor_state(
var tryGetStreamProcessorState = get_stream_processor_state_for(id);
tryGetStreamProcessorState.Success.ShouldBeTrue();
var state = tryGetStreamProcessorState.Result;
state!.Partitioned.ShouldEqual(partitioned);
state!.Partitioned.Should().Be(partitioned);

if (partitioned)
{
state.Position.StreamPosition.Value.ShouldEqual((ulong)num_events_to_handle);
state.Position.StreamPosition.Value.Should().Be((ulong)num_events_to_handle);
expect_partitioned_event_processor_stream_processor_state(state, failing_partitioned_state);
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ static class event_content_serializer
{
public static readonly JsonWriterSettings json_settings = new()
{
OutputMode = JsonOutputMode.Strict,
OutputMode = JsonOutputMode.CanonicalExtendedJson,
Indent = false,
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ static class event_content_serializer
{
public static readonly JsonWriterSettings json_settings = new()
{
OutputMode = JsonOutputMode.Strict,
OutputMode = JsonOutputMode.CanonicalExtendedJson,
Indent = false,
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ static class event_content_serializer
{
public static readonly JsonWriterSettings json_settings = new()
{
OutputMode = JsonOutputMode.Strict,
OutputMode = JsonOutputMode.CanonicalExtendedJson,
Indent = false,
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ static class event_content_serializer
{
public static readonly JsonWriterSettings json_settings = new()
{
OutputMode = JsonOutputMode.Strict,
OutputMode = JsonOutputMode.CanonicalExtendedJson,
Indent = false,
};
}
35 changes: 29 additions & 6 deletions Source/Events.Store.MongoDB/CommittedEventsFetcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,42 @@ public async Task<EventLogSequenceNumber> FetchNextSequenceNumber(ScopeId scope,
_logger.LogError("No last event found, but event count was {EventCount}", eventCount);
return 0ul;
}

var nextSequenceNumber = lastEvent.EventLogSequenceNumber + 1;

if(nextSequenceNumber != eventCount)
if (nextSequenceNumber != eventCount)
{
_logger.LogError("Last event sequence number was {LastEventSequenceNumber}, but event count was {EventCount}", lastEvent.EventLogSequenceNumber, eventCount);
_logger.LogError("Last event sequence number was {LastEventSequenceNumber}, but event count was {EventCount}", lastEvent.EventLogSequenceNumber,
eventCount);
}

return nextSequenceNumber;
}

public async Task<EventLogSequenceNumber> FetchNextSequenceNumberAfter(ScopeId scope, DateTimeOffset timestamp, CancellationToken cancellationToken)
{
var eventLog = await GetEventLog(scope, cancellationToken).ConfigureAwait(false);

// Search from the end of the event log
// Find first event that happened before the given timestamp
var lastEvent = await eventLog
.Find(_eventFilter.Lt(evt => evt.Metadata.Occurred, timestamp.UtcDateTime))
.Sort(Builders<MongoDB.Events.Event>.Sort.Descending(_ => _.EventLogSequenceNumber))
.Limit(1)
.SingleOrDefaultAsync(cancellationToken).ConfigureAwait(false);

if (lastEvent is null)
{
return EventLogSequenceNumber.Initial;
}

// The next event will be after the timestamp cutoff
return lastEvent.EventLogSequenceNumber + 1;
}

/// <inheritdoc/>
public async Task<CommittedEvents> FetchCommittedEvents(ScopeId scope, EventLogSequenceNumber from, EventLogSequenceNumber to, int limit, ISet<Guid> artifactSet, CancellationToken cancellationToken)
public async Task<CommittedEvents> FetchCommittedEvents(ScopeId scope, EventLogSequenceNumber from, EventLogSequenceNumber to, int limit,
ISet<Guid> artifactSet, CancellationToken cancellationToken)
{
try
{
Expand All @@ -102,14 +124,15 @@ public async Task<CommittedEvents> FetchCommittedEvents(ScopeId scope, EventLogS
{
filter &= _eventFilter.In(_ => _.Metadata.TypeId, artifactSet);
}

var raw = await eventLog
.Find(filter)
.Sort(Builders<MongoDB.Events.Event>.Sort.Ascending(_ => _.EventLogSequenceNumber))
.Limit(limit)
.ToListAsync(cancellationToken).ConfigureAwait(false);

var events = raw.Select(evt => _eventConverter.ToRuntimeCommittedEvent(evt)).ToList();

return new CommittedEvents(events);
}
catch (MongoWaitQueueFullException ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ CancellationToken cancellationToken
_arguments.Alias,
_arguments.EventTypes,
_arguments.Partitioned,
_arguments.Concurrency);
_arguments.Concurrency,
_arguments.StartFrom,
_arguments.StopAt);

public ScopeId Scope => _arguments.Scope.Value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,10 @@ public EventHandlerProcessorActor(
}

public static CreateStreamProcessorActorProps CreateFactory(ICreateProps provider)
=> (streamProcessorId, streamDefinition, createEventProcessorFor, processedEvent, failedToProcessEvent, executionContext, eventHandlerInfo, cancellationTokenSource) =>
CreatePropsFor(provider, streamProcessorId, streamDefinition, createEventProcessorFor, processedEvent, failedToProcessEvent, executionContext, eventHandlerInfo,
=> (streamProcessorId, streamDefinition, createEventProcessorFor, processedEvent, failedToProcessEvent, executionContext, eventHandlerInfo,
cancellationTokenSource) =>
CreatePropsFor(provider, streamProcessorId, streamDefinition, createEventProcessorFor, processedEvent, failedToProcessEvent, executionContext,
eventHandlerInfo,
cancellationTokenSource);

static Props CreatePropsFor(ICreateProps provider,
Expand All @@ -124,7 +126,8 @@ static Props CreatePropsFor(ICreateProps provider,
EventHandlerInfo eventHandlerInfo,
CancellationTokenSource cancellationTokenSource)
=> provider.PropsFor<EventHandlerProcessorActor>(
streamProcessorId, streamDefinition, createEventProcessorFor, executionContext, processedEvent, failedToProcessEvent, eventHandlerInfo, cancellationTokenSource);
streamProcessorId, streamDefinition, createEventProcessorFor, executionContext,
processedEvent, failedToProcessEvent, eventHandlerInfo, cancellationTokenSource);

public async Task ReceiveAsync(IContext context)
{
Expand Down Expand Up @@ -220,7 +223,7 @@ Task OnTerminated(Terminated terminated, IContext context)
{
_stopEverything.Cancel();
}

_streamProcessors.Remove(stoppedChild.Key);
}

Expand All @@ -245,6 +248,7 @@ async Task OnStarted(IContext context)
{
await Task.Delay(100);
}

if (!context.System.Cluster().MemberList.Started.IsCompletedSuccessfully)
{
await context.System.Cluster().MemberList.Started;
Expand Down
Loading

0 comments on commit 98642e9

Please sign in to comment.