Skip to content

Commit

Permalink
Merge pull request #776 from dolittle/gdpr-redactions
Browse files Browse the repository at this point in the history
GDPR redaction support
  • Loading branch information
mhelleborg authored Oct 23, 2024
2 parents e36f0d1 + c56b7a3 commit 22f4d8f
Show file tree
Hide file tree
Showing 9 changed files with 307 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@
using System.Threading.Tasks;
using Dolittle.Runtime.Events.Store.EventHorizon;
using Dolittle.Runtime.Events.Store.MongoDB.Events;
using Dolittle.Runtime.Events.Store.MongoDB.Persistence;
using Dolittle.Runtime.Events.Store.MongoDB.Streams;
using Dolittle.Runtime.Events.Store.Streams;
using MongoDB.Driver;

namespace Dolittle.Runtime.Events.Store.MongoDB.EventHorizon;

Expand Down
8 changes: 8 additions & 0 deletions Source/Events.Store.MongoDB/Persistence/CommitWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public async Task<Try> Persist(Commit commit, CancellationToken cancellationToke
{
return new NoEventsToCommit();
}
var evt = eventsToStore.First();

using var session =
await _streams.StartSessionAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
Expand All @@ -68,6 +69,13 @@ await _streams.DefaultEventLog.InsertManyAsync(
eventsToStore,
cancellationToken: cancellationToken).ConfigureAwait(false);

// Perform any redactions
if (commit.Redactions.Count > 0)
{
await RedactionUtil.RedactEvents(session, _streams.DefaultEventLog, commit.Redactions, cancellationToken)
.ConfigureAwait(false);
}

var nextSequenceNumber = commit.LastSequenceNumber + 1;
await _offsetStore.UpdateOffset(Streams.Streams.EventLogCollectionName, session, nextSequenceNumber, cancellationToken)
.ConfigureAwait(false);
Expand Down
1 change: 0 additions & 1 deletion Source/Events.Store.MongoDB/Persistence/OffsetStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
using Dolittle.Runtime.DependencyInversion.Scoping;
using Dolittle.Runtime.Events.Store.MongoDB.Events;
using Dolittle.Runtime.Events.Store.MongoDB.Migrations;
using MongoDB.Bson;
using MongoDB.Driver;

namespace Dolittle.Runtime.Events.Store.MongoDB.Persistence;
Expand Down
173 changes: 173 additions & 0 deletions Source/Events.Store.MongoDB/Persistence/RedactionUtil.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Copyright (c) Dolittle. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Dolittle.Runtime.Events.Store.Redactions;
using MongoDB.Bson;
using MongoDB.Driver;

namespace Dolittle.Runtime.Events.Store.MongoDB.Persistence;

public static class RedactionUtil
{
/// <summary>
/// This will redact specific personal data from the event log
/// </summary>
/// <param name="session"></param>
/// <param name="collection"></param>
/// <param name="redactions"></param>
/// <param name="cancellationToken"></param>
public static async Task RedactEvents(IClientSessionHandle session, IMongoCollection<Events.Event> collection,
IReadOnlyCollection<Redaction> redactions, CancellationToken cancellationToken)
{
if (redactions.Count == 0) return;

var updateOperations = redactions.Select(redaction => redaction.CreateUpdateModel()).ToList();

await collection.BulkWriteAsync(session, updateOperations, new BulkWriteOptions
{
IsOrdered = true,
BypassDocumentValidation = false
}, cancellationToken: cancellationToken);
}

private static UpdateManyModel<Events.Event> CreateUpdateModel(this Redaction redaction)
{
var (updateDefinition, hasUpdateFilter) = CreateUpdateDefinition(redaction);
var matchesEventFilter = CreateFilter(redaction);
var filter = Builders<Events.Event>.Filter.And(matchesEventFilter, hasUpdateFilter);
return new UpdateManyModel<Events.Event>(filter, updateDefinition);
}

static (UpdateDefinition<Events.Event> updateDefinition, FilterDefinition<Events.Event> alreadyUpdatedFilter) CreateUpdateDefinition(this Redaction redaction)
{
var updates = new List<UpdateDefinition<Events.Event>>();
var filters = new List<FilterDefinition<Events.Event>>();

foreach (var (field, value) in redaction.Details.RedactedProperties)
{
var asBson = ToBsonValue(value);

FieldDefinition<Events.Event, BsonValue> contentField = $"Content.{field}";
if (asBson is null)
{
updates.Add(Builders<Events.Event>.Update.Unset(contentField));
filters.Add(Builders<Events.Event>.Filter.Exists(contentField));
}
else
{
updates.Add(Builders<Events.Event>.Update.Set(contentField, asBson));
filters.Add(Builders<Events.Event>.Filter.Ne(contentField, asBson));
}
}

// Point to the redaction event
updates.Add(Builders<Events.Event>.Update.AddToSet("RedactedBy", redaction.EventLogSequenceNumber.Value));

var matchesAnyFilter = Builders<Events.Event>.Filter.Or(filters);
return (Builders<Events.Event>.Update.Combine(updates), matchesAnyFilter);
}

static FilterDefinition<Events.Event> CreateFilter(this Redaction redaction)
{
return Builders<Events.Event>.Filter.And(
Builders<Events.Event>.Filter.Lt(evt => evt.EventLogSequenceNumber, redaction.EventLogSequenceNumber.Value),
Builders<Events.Event>.Filter.Eq(evt => evt.Metadata.EventSource, redaction.EventSourceId.Value),
Builders<Events.Event>.Filter.Eq(evt => evt.Metadata.TypeId, redaction.TypeId)
);
}

static BsonValue? ToBsonValue(object? element)
{
if (element is null)
{
return null;
}

if (element is JsonElement jsonElement)
{
return jsonElement.ToBsonValue();
}

switch (element)
{
case string str:
return new BsonString(str);

case int intValue:
return new BsonInt32(intValue);

case long longValue:
return new BsonInt64(longValue);

case double doubleValue:
return new BsonDouble(doubleValue);

case decimal decimalValue:
return new BsonDecimal128(decimalValue);

case bool boolValue:
return boolValue ? BsonBoolean.True : BsonBoolean.False;

default:
return null;
}
}


/// <summary>
/// Converts supported JSON values to BSON values
/// Does not support arrays or objects
/// </summary>
/// <param name="element"></param>
/// <returns></returns>
/// <exception cref="ArgumentException"></exception>
static BsonValue? ToBsonValue(this JsonElement element)
{
switch (element.ValueKind)
{
case JsonValueKind.String:
var str = element.GetString();

return new BsonString(str);

case JsonValueKind.Number:
if (element.TryGetInt32(out var intValue))
{
return new BsonInt32(intValue);
}

if (element.TryGetInt64(out var longValue))
{
return new BsonInt64(longValue);
}

if (element.TryGetDouble(out var doubleValue))
{
return new BsonDouble(doubleValue);
}

if (element.TryGetDecimal(out var decimalValue))
{
return new BsonDecimal128(decimalValue);
}

throw new ArgumentException("Unsupported numeric type");

case JsonValueKind.True:
return BsonBoolean.True;

case JsonValueKind.False:
return BsonBoolean.False;

case JsonValueKind.Null:
default:
return null;
}
}
}
1 change: 0 additions & 1 deletion Source/Events.Store.MongoDB/Streams/Streams.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Threading.Tasks;
using Dolittle.Runtime.DependencyInversion.Lifecycle;
using Dolittle.Runtime.DependencyInversion.Scoping;
using Dolittle.Runtime.Events.Store.MongoDB.Events;
using Dolittle.Runtime.Events.Store.Streams;
using Microsoft.Extensions.Logging;
using MongoDB.Driver;
Expand Down
4 changes: 3 additions & 1 deletion Source/Events/Store/Actors/EventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading.Tasks;
using Dolittle.Runtime.Actors;
Expand Down Expand Up @@ -137,7 +138,8 @@ public override Task<CommitExternalEventsResponse> CommitExternal(CommitExternal
Array.Empty<CommittedAggregateEvents>(),
committedEvents,
request.Event.EventLogSequenceNumber,
request.Event.EventLogSequenceNumber));
request.Event.EventLogSequenceNumber,
ImmutableList<Redactions.Redaction>.Empty)); // TODO: Consider redaction via external event

return Task.FromResult(new CommitExternalEventsResponse());
}
Expand Down
4 changes: 3 additions & 1 deletion Source/Events/Store/Persistence/Commit.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ public record Commit(
IReadOnlyCollection<CommittedAggregateEvents> AggregateEvents,
IReadOnlyCollection<CommittedEvent> AllEvents,
EventLogSequenceNumber FirstSequenceNumber,
EventLogSequenceNumber LastSequenceNumber);
EventLogSequenceNumber LastSequenceNumber,
IReadOnlyCollection<Redactions.Redaction> Redactions
);
30 changes: 26 additions & 4 deletions Source/Events/Store/Persistence/CommitBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class CommitBuilder : ICanBuildABatch<Commit>
{
readonly List<CommittedEvents> _committedEvents = new();
readonly List<CommittedAggregateEvents> _committedAggregateEvents = new();
readonly List<Redactions.Redaction> _redactions = new();
readonly HashSet<Aggregate> _aggregates = new();
EventLogSequenceNumber _nextSequenceNumber;
readonly List<CommittedEvent> _orderedEvents = new();
Expand Down Expand Up @@ -76,6 +77,9 @@ public bool TryAddEventsFrom(CommitEventsRequest request, out CommittedEvents ev
_orderedEvents.AddRange(committedEvents);
_nextSequenceNumber = nextSequenceNumber;
eventsToBeCommitted = committedEvents;

AddRedactions(committedEvents);

return true;
}
catch (Exception ex)
Expand Down Expand Up @@ -127,6 +131,9 @@ public bool TryAddEventsFrom(CommitAggregateEventsRequest request, out Committed
_orderedEvents.AddRange(committedEvents);
_nextSequenceNumber = nextSequenceNumber;
eventsToBeCommitted = committedEvents;

AddRedactions(committedEvents);

return true;
}
catch (Exception ex)
Expand All @@ -135,20 +142,20 @@ public bool TryAddEventsFrom(CommitAggregateEventsRequest request, out Committed
return false;
}
}

/// <inheritdoc />
public Commit Build() => new (_committedEvents, _committedAggregateEvents, _orderedEvents, _initialSequenceNumber, _nextSequenceNumber - 1);
public Commit Build() => new (_committedEvents, _committedAggregateEvents, _orderedEvents, _initialSequenceNumber, _nextSequenceNumber - 1, _redactions);

bool TryAddCommittedAggregateEvents(CommittedAggregateEvents events, out Exception error)
{
error = default;
var aggregate = new Aggregate(events.AggregateRoot, events.EventSource);
if (_aggregates.Contains(aggregate))
if (!_aggregates.Add(aggregate))
{
error = new EventsForAggregateAlreadyAddedToCommit(aggregate);
return false;
}
_aggregates.Add(aggregate);

_committedAggregateEvents.Add(events);
return true;

Expand All @@ -164,4 +171,19 @@ bool TryAddCommittedAggregateEvents(CommittedAggregateEvents events, out Excepti
// //TODO: Update the aggregate root version range
// _committedAggregateEvents.Add(committedEvents);
}

/// <summary>
/// If the events contain valid redactions, add them to the redactions list.
/// </summary>
/// <param name="committedEvents"></param>
void AddRedactions(IEnumerable<CommittedEvent> committedEvents)
{
foreach (var evt in committedEvents)
{
if (Redactions.Redaction.TryGet(evt, out var redaction))
{
_redactions.Add(redaction);
}
}
}
}
Loading

0 comments on commit 22f4d8f

Please sign in to comment.