Skip to content

Commit

Permalink
Merge pull request #754 from dolittle/eventhorizon-error-handling
Browse files Browse the repository at this point in the history
Eventhorizon error handling
  • Loading branch information
mhelleborg authored Oct 16, 2023
2 parents d330a51 + 299bbfb commit feb9b20
Show file tree
Hide file tree
Showing 9 changed files with 22 additions and 24 deletions.
22 changes: 11 additions & 11 deletions Integration/Shared/Runtime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,26 @@ public static RunningRuntime CreateAndStart(int numberOfTenants)
.AddEnvironmentVariables()
.Build();

var configuration = new Dictionary<string, string>();
var configuration = new Dictionary<string, string?>();
var (databases, tenants) = CreateRuntimeConfiguration(configuration, numberOfTenants);

var runtimeHost = Host.CreateDefaultBuilder()
.UseDolittleServices()
.ConfigureOpenTelemetry(cfg)
.ConfigureHostConfiguration(_ =>
.ConfigureHostConfiguration(builder =>
{
_.Sources.Clear();
_.AddInMemoryCollection(configuration);
builder.Sources.Clear();
builder.AddInMemoryCollection(configuration);
})
.ConfigureAppConfiguration(_ =>
.ConfigureAppConfiguration(builder =>
{
_.Sources.Clear();
_.AddInMemoryCollection(configuration);
builder.Sources.Clear();
builder.AddInMemoryCollection(configuration);
})
.ConfigureServices(_ =>
.ConfigureServices(coll =>
{
_.AddLogging(_ => _.ClearProviders());
_.AddOptions<EndpointsConfiguration>().Configure(builder =>
coll.AddLogging(builder => builder.ClearProviders());
coll.AddOptions<EndpointsConfiguration>().Configure(builder =>
{
builder.Management = new EndpointConfiguration { Port = 0 };
// builder.Private = new EndpointConfiguration { Port = 0 };
Expand Down Expand Up @@ -106,7 +106,7 @@ public static Task DropAllDatabases(RunningRuntime runtime)
public static Dolittle.Runtime.Execution.ExecutionContext CreateExecutionContextFor(TenantId tenant)
=> new(_microserviceId, tenant, Version.NotSet, _environment, Guid.NewGuid(), null, Claims.Empty, CultureInfo.InvariantCulture);

static (IEnumerable<string> databases, IEnumerable<TenantId> tenants) CreateRuntimeConfiguration(Dictionary<string, string> configuration,
static (IEnumerable<string> databases, IEnumerable<TenantId> tenants) CreateRuntimeConfiguration(Dictionary<string, string?> configuration,
int numberOfTenants)
{
configuration["dolittle:runtime:eventstore:backwardscompatibility:version"] = "V7";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ protected static async Task run_event_handlers_until_completion(IEnumerable<Task
(_, _) => dispatcher.Object.Reject(new EventHandlerRegistrationResponse(), CancellationToken.None),
CancellationToken.None)));
tasks.Add(post_start_task ?? Task.CompletedTask);
reset_timeout_after_action(TimeSpan.FromSeconds(2));
reset_timeout_after_action(TimeSpan.FromSeconds(6));
await Task.WhenAll(tasks).ConfigureAwait(false);
}

Expand Down Expand Up @@ -296,7 +296,7 @@ protected static void fail_for_event_sources(IEnumerable<EventSourceId> event_so
});
}

static async Task commit_after_delay(IEnumerable<(int number_of_events, EventSourceId event_source, ScopeId scope)> commit)
static async Task commit_after_delay((int number_of_events, EventSourceId event_source, ScopeId scope)[] commit)
{
if (commit.Any())
{
Expand Down
7 changes: 4 additions & 3 deletions Source/EventHorizon/Consumer/Processing/EventProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,20 @@ public Task<IProcessingResult> Process(CommittedEvent @event, PartitionId partit
return Process(@event, cancellationToken);
}

async Task<IProcessingResult> Process(CommittedEvent @event, CancellationToken cancellationToken)
async Task<IProcessingResult> Process(CommittedEvent @event, CancellationToken _)
{
_metrics.IncrementTotalEventHorizonEventsProcessed();
Log.ProcessEvent(_logger, @event.Type.Id, Scope, _subscriptionId.ProducerMicroserviceId, _subscriptionId.ProducerTenantId);
try
{
await _externalEventsCommitter.Commit(new CommittedEvents(new []{@event}), _consentId, Scope).ConfigureAwait(false);
return SuccessfulProcessing.Instance;
}
catch (Exception e)
{
_logger.LogWarning(e, "Failed to commit external event");
_logger.LogWarning(e, "Failed to commit external event, will retry processing later");
_metrics.IncrementTotalEventHorizonEventWritesFailed();
throw;
}
return SuccessfulProcessing.Instance;
}
}
8 changes: 4 additions & 4 deletions Source/Events.Store.MongoDB/CommittedEventsFetcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,10 @@ async IAsyncEnumerable<CommittedAggregateEvents> DoFetchForAggregate(EventSource
new Artifact(
evt.Aggregate.TypeId,
evt.Aggregate.TypeGeneration),
evt.Aggregate.Version,
evt.EventLogSequenceNumber,
evt.Metadata.Occurred,
evt.Metadata.EventSource,
aggregateRootVersion: evt.Aggregate.Version,
eventLogSequenceNumber: evt.EventLogSequenceNumber,
occurred:evt.Metadata.Occurred,
eventSource:evt.Metadata.EventSource,
evt.ExecutionContext.ToExecutionContext(),
new Artifact(
evt.Metadata.TypeId,
Expand Down
1 change: 0 additions & 1 deletion Source/Events.Store.MongoDB/DatabaseConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
using MongoDB.Bson.Serialization;
using MongoDB.Bson.Serialization.Conventions;
using MongoDB.Driver;
using MongoDB.Driver.Core.Extensions.DiagnosticSources;

namespace Dolittle.Runtime.Events.Store.MongoDB;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ static bool IsDuplicateKeyException(Exception exception)
MongoDuplicateKeyException => true,
MongoWriteException writeException => writeException.Message.Contains("duplicate key error"),
MongoBulkWriteException bulkWriteException => bulkWriteException.Message.Contains("duplicate key error"),
MongoCommandException commandException => commandException.Message.Contains("WriteConflict"),
_ => false,
};
}
1 change: 0 additions & 1 deletion Source/Events/Store/Log.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System;
using Dolittle.Runtime.Events.Processing.Streams;
using Microsoft.Extensions.Logging;
using Proto;

namespace Dolittle.Runtime.Events.Store;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Dolittle. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Threading;
using System.Threading.Tasks;
using Dolittle.Runtime.DependencyInversion.Lifecycle;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Dolittle. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using Dolittle.Runtime.Events.Processing.Streams.Partitioned;
using Dolittle.Runtime.Events.Store.Actors;
using FluentAssertions;
Expand Down

0 comments on commit feb9b20

Please sign in to comment.