Skip to content

Commit

Permalink
feat(Eventbus.Outbox.EFCore): posibility to add multiple events
Browse files Browse the repository at this point in the history
  • Loading branch information
TrotsenkoSergey committed Mar 17, 2024
1 parent 180c55b commit f59eadf
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 31 deletions.
8 changes: 7 additions & 1 deletion EventBus/EventBus.Outbox/EFCore/DependencyInjection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,13 @@ private static IServiceCollection AddOutboxServices(this IServiceCollection serv
{
services.AddCommonOutboxServices();

services.AddSingleton(new EventLogSettings(assemblyFullNameWhereIntegrationEventsStore));
var eventTypes = Assembly
.Load(assemblyFullNameWhereIntegrationEventsStore)
.GetTypes()
.Where(t => t.BaseType == typeof(IntegrationEvent))
.ToList();

services.AddSingleton(new EventLogSettings(assemblyFullNameWhereIntegrationEventsStore, eventTypes));
services.AddTransient<IIntegrationEventLogPersistenceTransactional, IntegrationEventLogService>();
services.AddTransient<IIntegrationEventLogPersistence>(sp => sp.GetRequiredService<IIntegrationEventLogPersistenceTransactional>());

Expand Down
3 changes: 2 additions & 1 deletion EventBus/EventBus.Outbox/EFCore/EventLogSettings.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace Tsw.EventBus.Outbox;

public record EventLogSettings(
string AssemblyFullNameWhereIntegrationEventsStore
string AssemblyFullNameWhereIntegrationEventsStore,
IReadOnlyList<Type> EventTypes
);
59 changes: 30 additions & 29 deletions EventBus/EventBus.Outbox/EFCore/IntegrationEventLogService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

public class IntegrationEventLogService : IIntegrationEventLogPersistenceTransactional
{
protected readonly List<Type> _eventTypes;
protected readonly IReadOnlyList<Type> _eventTypes;
protected readonly IntegrationEventLogDbContext _context;
private readonly JsonSerializerOptions _jsonSerializerOptions;

Expand All @@ -11,12 +11,7 @@ public IntegrationEventLogService(
IntegrationEventLogDbContext context,
IOptionsMonitor<JsonSerializerOptions> jsonSerializerOptions)
{
_eventTypes = Assembly
.Load(outboxSettings.AssemblyFullNameWhereIntegrationEventsStore)
.GetTypes()
.Where(t => t.Name.EndsWith(nameof(IntegrationEvent)))
.ToList();

_eventTypes = outboxSettings.EventTypes;
_context = context;
_jsonSerializerOptions = jsonSerializerOptions.Get("Outbox.EFCore");
}
Expand All @@ -27,11 +22,11 @@ public virtual async Task<IEnumerable<IntegrationEventLog>> GetEventLogsAwaiting

if (!result.Any())
{
return Enumerable.Empty<IntegrationEventLog>();
return [];
}

return result.OrderBy(e => e.CreatedOnUtc)
.Select(e => DeserializeJsonContent(e, _eventTypes.Find(t => t.Name == e.EventTypeShortName)!));
.Select(e => DeserializeJsonContent(e, _eventTypes.First(t => t.Name == e.EventTypeShortName)));
}

public virtual async Task<IEnumerable<PublishContent>> GetEventLogsAwaitingToPublishInJsonAsync()
Expand All @@ -40,7 +35,7 @@ public virtual async Task<IEnumerable<PublishContent>> GetEventLogsAwaitingToPub

if (!result.Any())
{
return Enumerable.Empty<PublishContent>();
return [];
}

return result.OrderBy(e => e.CreatedOnUtc)
Expand All @@ -52,31 +47,37 @@ protected virtual Task<List<IntegrationEventLog>> GetNotPublishedEvents() =>
.Where(e => e.State == IntegrationEventState.NotPublished)
.ToListAsync();

public virtual Task SaveEventWithAsync(DbTransaction currentTransaction, IntegrationEvent @event)
public virtual Task SaveEventWithAsync(DbTransaction currentTransaction, params IntegrationEvent[] events)
{
ArgumentNullException.ThrowIfNull(nameof(currentTransaction));

_context.Database.SetDbConnection(currentTransaction.Connection);
_context.Database.UseTransaction(currentTransaction);

return SaveEventAsync(@event);
return SaveEventAsync(events);
}

public virtual Task SaveEventAsync(IntegrationEvent @event)
public virtual Task SaveEventAsync(params IntegrationEvent[] events)
{
Type eventType = @event.GetType();
var eventLogEntry = new IntegrationEventLog()
List<IntegrationEventLog> integrationEventLogs = [];

foreach (var @event in events)
{
Id = @event.Id,
IntegrationEvent = @event,
Content = SerializeJsonContent(@event, eventType),
CreatedOnUtc = @event.CreationDate,
State = IntegrationEventState.NotPublished,
TimesSent = 0,
EventTypeName = eventType.FullName!
};

_context.Set<IntegrationEventLog>().Add(eventLogEntry);
Type eventType = @event.GetType();
var eventLogEntry = new IntegrationEventLog()
{
Id = @event.Id,
IntegrationEvent = @event,
Content = SerializeJsonContent(@event, eventType),
CreatedOnUtc = @event.CreationDate,
State = IntegrationEventState.NotPublished,
TimesSent = 0,
EventTypeName = eventType.FullName!
};
integrationEventLogs.Add(eventLogEntry);
}

_context.Set<IntegrationEventLog>().AddRange(integrationEventLogs);

return _context.SaveChangesAsync();
}
Expand All @@ -97,8 +98,8 @@ protected virtual Task UpdateEventStatusAsync(Guid eventId, IntegrationEventStat

eventLogEntry.State = status;
if (eventLogEntry.State == IntegrationEventState.InProgress)
{
eventLogEntry.TimesSent++;
{
eventLogEntry.TimesSent++;
}

return _context.SaveChangesAsync();
Expand All @@ -111,13 +112,13 @@ private IntegrationEventLog DeserializeJsonContent(IntegrationEventLog eventLog,
throw new NullReferenceException(nameof(eventLog.Content));
}

eventLog.IntegrationEvent =
eventLog.IntegrationEvent =
JsonSerializer.Deserialize(eventLog.Content, type, _jsonSerializerOptions) as IntegrationEvent ??
throw new JsonException($"Can't deserialize {type.FullName} integration event.");

return eventLog;
}

private string SerializeJsonContent(IntegrationEvent @event, Type type) =>
private string SerializeJsonContent(IntegrationEvent @event, Type type) =>
JsonSerializer.Serialize(@event, type, _jsonSerializerOptions);
}

0 comments on commit f59eadf

Please sign in to comment.