diff --git a/Source/Events.Store.MongoDB/EventHorizon/EventHorizonEventsWriter.cs b/Source/Events.Store.MongoDB/EventHorizon/EventHorizonEventsWriter.cs index b08e77963..ef8b70db6 100644 --- a/Source/Events.Store.MongoDB/EventHorizon/EventHorizonEventsWriter.cs +++ b/Source/Events.Store.MongoDB/EventHorizon/EventHorizonEventsWriter.cs @@ -6,10 +6,8 @@ using System.Threading.Tasks; using Dolittle.Runtime.Events.Store.EventHorizon; using Dolittle.Runtime.Events.Store.MongoDB.Events; -using Dolittle.Runtime.Events.Store.MongoDB.Persistence; using Dolittle.Runtime.Events.Store.MongoDB.Streams; using Dolittle.Runtime.Events.Store.Streams; -using MongoDB.Driver; namespace Dolittle.Runtime.Events.Store.MongoDB.EventHorizon; diff --git a/Source/Events.Store.MongoDB/Persistence/CommitWriter.cs b/Source/Events.Store.MongoDB/Persistence/CommitWriter.cs index 3df30752f..3c66a6c01 100644 --- a/Source/Events.Store.MongoDB/Persistence/CommitWriter.cs +++ b/Source/Events.Store.MongoDB/Persistence/CommitWriter.cs @@ -57,6 +57,7 @@ public async Task Persist(Commit commit, CancellationToken cancellationToke { return new NoEventsToCommit(); } + var evt = eventsToStore.First(); using var session = await _streams.StartSessionAsync(cancellationToken: cancellationToken).ConfigureAwait(false); @@ -68,6 +69,13 @@ await _streams.DefaultEventLog.InsertManyAsync( eventsToStore, cancellationToken: cancellationToken).ConfigureAwait(false); + // Perform any redactions + if (commit.Redactions.Count > 0) + { + await RedactionUtil.RedactEvents(session, _streams.DefaultEventLog, commit.Redactions, cancellationToken) + .ConfigureAwait(false); + } + var nextSequenceNumber = commit.LastSequenceNumber + 1; await _offsetStore.UpdateOffset(Streams.Streams.EventLogCollectionName, session, nextSequenceNumber, cancellationToken) .ConfigureAwait(false); diff --git a/Source/Events.Store.MongoDB/Persistence/OffsetStore.cs b/Source/Events.Store.MongoDB/Persistence/OffsetStore.cs index 2a25897dd..e75de3083 100644 --- a/Source/Events.Store.MongoDB/Persistence/OffsetStore.cs +++ b/Source/Events.Store.MongoDB/Persistence/OffsetStore.cs @@ -10,7 +10,6 @@ using Dolittle.Runtime.DependencyInversion.Scoping; using Dolittle.Runtime.Events.Store.MongoDB.Events; using Dolittle.Runtime.Events.Store.MongoDB.Migrations; -using MongoDB.Bson; using MongoDB.Driver; namespace Dolittle.Runtime.Events.Store.MongoDB.Persistence; diff --git a/Source/Events.Store.MongoDB/Persistence/RedactionUtil.cs b/Source/Events.Store.MongoDB/Persistence/RedactionUtil.cs new file mode 100644 index 000000000..6e8dd7f0c --- /dev/null +++ b/Source/Events.Store.MongoDB/Persistence/RedactionUtil.cs @@ -0,0 +1,173 @@ +// Copyright (c) Dolittle. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Dolittle.Runtime.Events.Store.Redactions; +using MongoDB.Bson; +using MongoDB.Driver; + +namespace Dolittle.Runtime.Events.Store.MongoDB.Persistence; + +public static class RedactionUtil +{ + /// + /// This will redact specific personal data from the event log + /// + /// + /// + /// + /// + public static async Task RedactEvents(IClientSessionHandle session, IMongoCollection collection, + IReadOnlyCollection redactions, CancellationToken cancellationToken) + { + if (redactions.Count == 0) return; + + var updateOperations = redactions.Select(redaction => redaction.CreateUpdateModel()).ToList(); + + await collection.BulkWriteAsync(session, updateOperations, new BulkWriteOptions + { + IsOrdered = true, + BypassDocumentValidation = false + }, cancellationToken: cancellationToken); + } + + private static UpdateManyModel CreateUpdateModel(this Redaction redaction) + { + var (updateDefinition, hasUpdateFilter) = CreateUpdateDefinition(redaction); + var matchesEventFilter = CreateFilter(redaction); + var filter = Builders.Filter.And(matchesEventFilter, hasUpdateFilter); + return new UpdateManyModel(filter, updateDefinition); + } + + static (UpdateDefinition updateDefinition, FilterDefinition alreadyUpdatedFilter) CreateUpdateDefinition(this Redaction redaction) + { + var updates = new List>(); + var filters = new List>(); + + foreach (var (field, value) in redaction.Details.RedactedProperties) + { + var asBson = ToBsonValue(value); + + FieldDefinition contentField = $"Content.{field}"; + if (asBson is null) + { + updates.Add(Builders.Update.Unset(contentField)); + filters.Add(Builders.Filter.Exists(contentField)); + } + else + { + updates.Add(Builders.Update.Set(contentField, asBson)); + filters.Add(Builders.Filter.Ne(contentField, asBson)); + } + } + + // Point to the redaction event + updates.Add(Builders.Update.AddToSet("RedactedBy", redaction.EventLogSequenceNumber.Value)); + + var matchesAnyFilter = Builders.Filter.Or(filters); + return (Builders.Update.Combine(updates), matchesAnyFilter); + } + + static FilterDefinition CreateFilter(this Redaction redaction) + { + return Builders.Filter.And( + Builders.Filter.Lt(evt => evt.EventLogSequenceNumber, redaction.EventLogSequenceNumber.Value), + Builders.Filter.Eq(evt => evt.Metadata.EventSource, redaction.EventSourceId.Value), + Builders.Filter.Eq(evt => evt.Metadata.TypeId, redaction.TypeId) + ); + } + + static BsonValue? ToBsonValue(object? element) + { + if (element is null) + { + return null; + } + + if (element is JsonElement jsonElement) + { + return jsonElement.ToBsonValue(); + } + + switch (element) + { + case string str: + return new BsonString(str); + + case int intValue: + return new BsonInt32(intValue); + + case long longValue: + return new BsonInt64(longValue); + + case double doubleValue: + return new BsonDouble(doubleValue); + + case decimal decimalValue: + return new BsonDecimal128(decimalValue); + + case bool boolValue: + return boolValue ? BsonBoolean.True : BsonBoolean.False; + + default: + return null; + } + } + + + /// + /// Converts supported JSON values to BSON values + /// Does not support arrays or objects + /// + /// + /// + /// + static BsonValue? ToBsonValue(this JsonElement element) + { + switch (element.ValueKind) + { + case JsonValueKind.String: + var str = element.GetString(); + + return new BsonString(str); + + case JsonValueKind.Number: + if (element.TryGetInt32(out var intValue)) + { + return new BsonInt32(intValue); + } + + if (element.TryGetInt64(out var longValue)) + { + return new BsonInt64(longValue); + } + + if (element.TryGetDouble(out var doubleValue)) + { + return new BsonDouble(doubleValue); + } + + if (element.TryGetDecimal(out var decimalValue)) + { + return new BsonDecimal128(decimalValue); + } + + throw new ArgumentException("Unsupported numeric type"); + + case JsonValueKind.True: + return BsonBoolean.True; + + case JsonValueKind.False: + return BsonBoolean.False; + + case JsonValueKind.Null: + default: + return null; + } + } +} diff --git a/Source/Events.Store.MongoDB/Streams/Streams.cs b/Source/Events.Store.MongoDB/Streams/Streams.cs index c052d26ac..a2b3f23f3 100644 --- a/Source/Events.Store.MongoDB/Streams/Streams.cs +++ b/Source/Events.Store.MongoDB/Streams/Streams.cs @@ -5,7 +5,6 @@ using System.Threading.Tasks; using Dolittle.Runtime.DependencyInversion.Lifecycle; using Dolittle.Runtime.DependencyInversion.Scoping; -using Dolittle.Runtime.Events.Store.MongoDB.Events; using Dolittle.Runtime.Events.Store.Streams; using Microsoft.Extensions.Logging; using MongoDB.Driver; diff --git a/Source/Events/Store/Actors/EventStore.cs b/Source/Events/Store/Actors/EventStore.cs index 7ac0e8f72..50b95eefb 100644 --- a/Source/Events/Store/Actors/EventStore.cs +++ b/Source/Events/Store/Actors/EventStore.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; +using System.Collections.Immutable; using System.Linq; using System.Threading.Tasks; using Dolittle.Runtime.Actors; @@ -137,7 +138,8 @@ public override Task CommitExternal(CommitExternal Array.Empty(), committedEvents, request.Event.EventLogSequenceNumber, - request.Event.EventLogSequenceNumber)); + request.Event.EventLogSequenceNumber, + ImmutableList.Empty)); // TODO: Consider redaction via external event return Task.FromResult(new CommitExternalEventsResponse()); } diff --git a/Source/Events/Store/Persistence/Commit.cs b/Source/Events/Store/Persistence/Commit.cs index 73f49f689..d4259debd 100644 --- a/Source/Events/Store/Persistence/Commit.cs +++ b/Source/Events/Store/Persistence/Commit.cs @@ -16,4 +16,6 @@ public record Commit( IReadOnlyCollection AggregateEvents, IReadOnlyCollection AllEvents, EventLogSequenceNumber FirstSequenceNumber, - EventLogSequenceNumber LastSequenceNumber); + EventLogSequenceNumber LastSequenceNumber, + IReadOnlyCollection Redactions + ); diff --git a/Source/Events/Store/Persistence/CommitBuilder.cs b/Source/Events/Store/Persistence/CommitBuilder.cs index b78cea626..e65873d3e 100644 --- a/Source/Events/Store/Persistence/CommitBuilder.cs +++ b/Source/Events/Store/Persistence/CommitBuilder.cs @@ -19,6 +19,7 @@ public class CommitBuilder : ICanBuildABatch { readonly List _committedEvents = new(); readonly List _committedAggregateEvents = new(); + readonly List _redactions = new(); readonly HashSet _aggregates = new(); EventLogSequenceNumber _nextSequenceNumber; readonly List _orderedEvents = new(); @@ -76,6 +77,9 @@ public bool TryAddEventsFrom(CommitEventsRequest request, out CommittedEvents ev _orderedEvents.AddRange(committedEvents); _nextSequenceNumber = nextSequenceNumber; eventsToBeCommitted = committedEvents; + + AddRedactions(committedEvents); + return true; } catch (Exception ex) @@ -127,6 +131,9 @@ public bool TryAddEventsFrom(CommitAggregateEventsRequest request, out Committed _orderedEvents.AddRange(committedEvents); _nextSequenceNumber = nextSequenceNumber; eventsToBeCommitted = committedEvents; + + AddRedactions(committedEvents); + return true; } catch (Exception ex) @@ -135,20 +142,20 @@ public bool TryAddEventsFrom(CommitAggregateEventsRequest request, out Committed return false; } } - + /// - public Commit Build() => new (_committedEvents, _committedAggregateEvents, _orderedEvents, _initialSequenceNumber, _nextSequenceNumber - 1); + public Commit Build() => new (_committedEvents, _committedAggregateEvents, _orderedEvents, _initialSequenceNumber, _nextSequenceNumber - 1, _redactions); bool TryAddCommittedAggregateEvents(CommittedAggregateEvents events, out Exception error) { error = default; var aggregate = new Aggregate(events.AggregateRoot, events.EventSource); - if (_aggregates.Contains(aggregate)) + if (!_aggregates.Add(aggregate)) { error = new EventsForAggregateAlreadyAddedToCommit(aggregate); return false; } - _aggregates.Add(aggregate); + _committedAggregateEvents.Add(events); return true; @@ -164,4 +171,19 @@ bool TryAddCommittedAggregateEvents(CommittedAggregateEvents events, out Excepti // //TODO: Update the aggregate root version range // _committedAggregateEvents.Add(committedEvents); } + + /// + /// If the events contain valid redactions, add them to the redactions list. + /// + /// + void AddRedactions(IEnumerable committedEvents) + { + foreach (var evt in committedEvents) + { + if (Redactions.Redaction.TryGet(evt, out var redaction)) + { + _redactions.Add(redaction); + } + } + } } diff --git a/Source/Events/Store/Redactions/Redaction.cs b/Source/Events/Store/Redactions/Redaction.cs new file mode 100644 index 000000000..4a0b7db8b --- /dev/null +++ b/Source/Events/Store/Redactions/Redaction.cs @@ -0,0 +1,94 @@ +// Copyright (c) Dolittle. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Text.Json; + +namespace Dolittle.Runtime.Events.Store.Redactions; + +/// +/// Event that triggers redaction of the given personal data +/// It will target the given event type and redact the properties specified within the EventSourceId of the event +/// +public class Redaction +{ + public const string RedactedPrefix = "de1e7e17-bad5-da7a"; + + public EventSourceId EventSourceId { get; } + public Event Details { get; } + public EventLogSequenceNumber EventLogSequenceNumber { get; } + public Guid TypeId { get; } + + public Redaction(EventSourceId eventSourceId, Event @event, EventLogSequenceNumber eventLogSequenceNumber, + Guid typeId) + { + EventSourceId = eventSourceId; + Details = @event; + EventLogSequenceNumber = eventLogSequenceNumber; + TypeId = typeId; + } + + public class Event + { + public required string EventId { get; init; } + public required string EventAlias { get; init; } + + /// + /// The properties that will be redacted, and the replacement values. + /// Can be null, in which case the properties will be redacted with a default value + /// + public required Dictionary RedactedProperties { get; init; } + + public required string RedactedBy { get; init; } + public required string Reason { get; init; } + + public bool IsValid => !string.IsNullOrWhiteSpace(EventId) + && !string.IsNullOrWhiteSpace(EventAlias) + && RedactedProperties.Count > 0 + && !string.IsNullOrWhiteSpace(RedactedBy) + && !string.IsNullOrWhiteSpace(Reason); + } + + public static bool TryGet(CommittedEvent evt, [NotNullWhen(true)] out Redaction? redaction) + { + redaction = default; + if (!IsRedactionId(evt.Type.Id)) + { + return false; + } + + try + { + var payload = JsonSerializer.Deserialize(evt.Content); + if (payload is not { IsValid: true }) + { + return false; + } + + if (!Guid.TryParse(payload.EventId, out var redactedTypeId)) + { + return false; + } + + if (IsRedactionId(redactedTypeId)) + { + // Cannot redact a redaction. This is to prevent removing logs of what has been redacted + // As redactions themselves should not contain PII, this should not be a problem + return false; + } + + + redaction = new Redaction(evt.EventSource, payload, evt.EventLogSequenceNumber, redactedTypeId); + return true; + } + catch // Bad payload, ignore + { + return false; + } + } + + static bool IsRedactionId(Guid id) => + id.ToString().StartsWith(RedactedPrefix, StringComparison.InvariantCultureIgnoreCase); +}