diff --git a/YDotNet.Native/YDotNet.Native.csproj b/YDotNet.Native/YDotNet.Native.csproj deleted file mode 100644 index cfadb03d..00000000 --- a/YDotNet.Native/YDotNet.Native.csproj +++ /dev/null @@ -1,9 +0,0 @@ - - - - net7.0 - enable - enable - - - diff --git a/YDotNet.Server.Clustering/ClusteringCallback.cs b/YDotNet.Server.Clustering/ClusteringCallback.cs deleted file mode 100644 index 417c1bb0..00000000 --- a/YDotNet.Server.Clustering/ClusteringCallback.cs +++ /dev/null @@ -1,195 +0,0 @@ -using Microsoft.Extensions.Options; -using ProtoBuf; -using StackExchange.Redis; -using YDotNet.Server.Clustering.Internal; - -namespace YDotNet.Server.Clustering; - -public sealed class RedisClusteringCallback : IDocumentCallback, IDisposable -{ - private readonly Guid senderId = Guid.NewGuid(); - private readonly RedisClusteringOptions redisOptions; - private readonly PublishQueue subscriberQueue; - private ISubscriber? subscriber; - private IDocumentManager? documentManager; - - public RedisClusteringCallback(IOptions redisOptions, RedisConnection redisConnection) - { - this.redisOptions = redisOptions.Value; - - subscriberQueue = new PublishQueue( - this.redisOptions.MaxBatchCount, - this.redisOptions.MaxBatchSize, - (int)this.redisOptions.DebounceTime.TotalMilliseconds, - PublishBatchAsync); - - _ = InitializeAsync(redisConnection); - } - - public async Task InitializeAsync(RedisConnection redisConnection) - { - // Use a single task, so that the ordering of registrations does not matter. - var connection = await redisConnection.Instance; - - subscriber = connection.GetSubscriber(); - subscriber.Subscribe(redisOptions.Channel, async (_, value) => - { - await HandleMessage(value); - }); - } - - public void Dispose() - { - subscriberQueue.Dispose(); - subscriber?.UnsubscribeAll(); - } - - public ValueTask OnInitializedAsync( - IDocumentManager manager) - { - // The initialize method is used to prevent circular dependencies between managers and hooks. - documentManager = manager; - return default; - } - - private async Task HandleMessage(RedisValue value) - { - if (documentManager == null) - { - return; - } - - var batch = Serializer.Deserialize(value); - - if (batch == null) - { - return; - } - - foreach (var message in batch) - { - if (message.SenderId == senderId) - { - continue; - } - - var context = new DocumentContext(message.DocumentName, message.ClientId) - { - Metadata = senderId - }; - - switch (message.Type) - { - case MessageType.ClientPinged: - await documentManager.PingAsync(context, message.ClientClock, message.ClientState); - break; - case MessageType.ClientDisconnected: - await documentManager.DisconnectAsync(context); - break; - case MessageType.Update when message.Data != null: - await documentManager.ApplyUpdateAsync(context, message.Data); - break; - case MessageType.SyncStep2 when message.Data != null: - await documentManager.ApplyUpdateAsync(context, message.Data); - break; - case MessageType.SyncStep1 when message.Data != null: - await SendSync2Async(context, message.Data); - break; - case MessageType.AwarenessRequested: - foreach (var (id, user) in await documentManager.GetAwarenessAsync(context)) - { - var userContext = context with { ClientId = id }; - - await SendAwarenessAsync(userContext, user.ClientState, user.ClientClock); - } - break; - } - } - } - - public ValueTask OnDocumentLoadedAsync(DocumentLoadEvent @event) - { - // Run these callbacks in another thread because it could cause deadlocks if it would interact with the same document. - _ = Task.Run(async () => - { - await SendAwarenessRequest(@event.Context); - await SendSync1Async(@event.Context); - }); - - return default; - } - - public async ValueTask OnAwarenessUpdatedAsync(ClientAwarenessEvent @event) - { - await SendAwarenessAsync(@event.Context, @event.ClientState, @event.ClientClock); - } - - public ValueTask OnClientDisconnectedAsync(ClientDisconnectedEvent @event) - { - var m = new Message { Type = MessageType.ClientDisconnected }; - - return EnqueueAsync(m, @event.Context); - } - - public ValueTask OnDocumentChangedAsync(DocumentChangedEvent @event) - { - var m = new Message { Type = MessageType.Update, Data = @event.Diff }; - - return EnqueueAsync(m, @event.Context); - } - - private ValueTask SendAwarenessAsync(DocumentContext context, string? state, long clock) - { - var m = new Message { Type = MessageType.ClientPinged, ClientState = state, ClientClock = clock }; - - return EnqueueAsync(m, context); - } - - private async ValueTask SendSync1Async(DocumentContext context) - { - var state = await documentManager!.GetStateVectorAsync(context); - - var m = new Message { Type = MessageType.SyncStep1, Data = state }; - - await EnqueueAsync(m, context); - } - - private async ValueTask SendSync2Async(DocumentContext context, byte[] stateVector) - { - var state = await documentManager!.GetUpdateAsync(context, stateVector); - - var m = new Message { Type = MessageType.SyncStep2, Data = state }; - - await EnqueueAsync(m, context); - } - - private ValueTask SendAwarenessRequest(DocumentContext context) - { - var m = new Message { Type = MessageType.AwarenessRequested }; - - return EnqueueAsync(m, context); - } - - private ValueTask EnqueueAsync(Message message, DocumentContext context) - { - message.ClientId = context.ClientId; - message.DocumentName = context.DocumentName; - message.SenderId = senderId; - - return subscriberQueue.EnqueueAsync(message, default); - } - - private async Task PublishBatchAsync(List batch, CancellationToken ct) - { - if (subscriber == null) - { - return; - } - - using var stream = new MemoryStream(); - - Serializer.Serialize(stream, batch); - - await subscriber.PublishAsync(redisOptions.Channel, stream.ToArray()); - } -} diff --git a/YDotNet.Server.Clustering/ClusteringOptions.cs b/YDotNet.Server.Clustering/ClusteringOptions.cs deleted file mode 100644 index 83848393..00000000 --- a/YDotNet.Server.Clustering/ClusteringOptions.cs +++ /dev/null @@ -1,10 +0,0 @@ -namespace YDotNet.Server.Clustering; - -public sealed class ClusteringOptions -{ - public TimeSpan DebounceTime { get; set; } = TimeSpan.FromMilliseconds(500); - - public int MaxBatchCount { get; set; } = 100; - - public int MaxBatchSize { get; set; } = 1024 * 1024; -} diff --git a/YDotNet.Server.Clustering/Internal/LoggerTextWriter.cs b/YDotNet.Server.Clustering/Internal/LoggerTextWriter.cs deleted file mode 100644 index 18abbbaf..00000000 --- a/YDotNet.Server.Clustering/Internal/LoggerTextWriter.cs +++ /dev/null @@ -1,30 +0,0 @@ -using Microsoft.Extensions.Logging; -using System.Text; - -namespace YDotNet.Server.Clustering.Internal; - -internal sealed class LoggerTextWriter : TextWriter -{ - private readonly ILogger log; - - public LoggerTextWriter(ILogger log) - { - this.log = log; - } - - public override Encoding Encoding => Encoding.UTF8; - - public override void Write(char value) - { - } - - public override void WriteLine(string? value) - { - if (log.IsEnabled(LogLevel.Debug)) - { -#pragma warning disable CA2254 // Template should be a static expression - log.LogDebug(new EventId(100, "RedisConnectionLog"), value); -#pragma warning restore CA2254 // Template should be a static expression - } - } -} diff --git a/YDotNet.Server.Clustering/Internal/PublishQueue.cs b/YDotNet.Server.Clustering/Internal/PublishQueue.cs deleted file mode 100644 index 379cb5c3..00000000 --- a/YDotNet.Server.Clustering/Internal/PublishQueue.cs +++ /dev/null @@ -1,85 +0,0 @@ -using System.Threading.Channels; - -namespace YDotNet.Server.Clustering.Internal; - -public interface ICanEstimateSize -{ - int EstimateSize(); -} - -public sealed class PublishQueue where T : ICanEstimateSize -{ - private readonly Channel inputChannel = Channel.CreateBounded(100); - private readonly Channel> outputChannel = Channel.CreateBounded>(2); - private readonly CancellationTokenSource cts = new(); - - public PublishQueue(int maxCount, int maxSize, int timeout, Func, CancellationToken, Task> handler) - { - Task.Run(async () => - { - var batchList = new List(maxCount); - var batchSize = 0; - - // Just a marker object to force sending out new batches. - var force = new object(); - - await using var timer = new Timer(_ => inputChannel.Writer.TryWrite(force)); - - async Task TrySendAsync() - { - if (batchList.Count > 0) - { - await outputChannel.Writer.WriteAsync(batchList, cts.Token); - - // Create a new batch, because the value is shared and might be processes by another concurrent task. - batchList = new List(); - batchSize = 0; - } - } - - // Exceptions usually that the process was stopped and the channel closed, therefore we do not catch them. - await foreach (var item in inputChannel.Reader.ReadAllAsync(cts.Token)) - { - if (ReferenceEquals(item, force)) - { - // Our item is the marker object from the timer. - await TrySendAsync(); - } - else if (item is T typed) - { - // The timeout restarts with the last event and should push events out if no further events are received. - timer.Change(timeout, Timeout.Infinite); - - batchList.Add(typed); - batchSize += typed.EstimateSize(); - - if (batchList.Count >= maxSize || batchSize >= maxSize) - { - await TrySendAsync(); - } - } - } - - await TrySendAsync(); - }, cts.Token).ContinueWith(x => outputChannel.Writer.TryComplete(x.Exception)); - - Task.Run(async () => - { - await foreach (var batch in outputChannel.Reader.ReadAllAsync(cts.Token)) - { - await handler(batch, cts.Token); - } - }, cts.Token); - } - - public ValueTask EnqueueAsync(T item, - CancellationToken ct) - { - return inputChannel.Writer.WriteAsync(item, ct); - } - - public void Dispose() - { - cts.Cancel(); - } -} diff --git a/YDotNet.Server.Clustering/Messages.cs b/YDotNet.Server.Clustering/Messages.cs deleted file mode 100644 index 59f3d709..00000000 --- a/YDotNet.Server.Clustering/Messages.cs +++ /dev/null @@ -1,56 +0,0 @@ -using ProtoBuf; -using YDotNet.Server.Clustering.Internal; - -#pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable. - -namespace YDotNet.Server.Clustering; - -[ProtoContract] -public enum MessageType -{ - ClientPinged, - ClientDisconnected, - AwarenessRequested, - Update, - SyncStep1, - SyncStep2, -} - -[ProtoContract] -public sealed class Message : ICanEstimateSize -{ - private static readonly int GuidLength = Guid.Empty.ToString().Length; - - [ProtoMember(1)] - public MessageType Type { get; set; } - - [ProtoMember(1)] - public Guid SenderId { get; set; } - - [ProtoMember(2)] - public string DocumentName { get; set; } - - [ProtoMember(3)] - public long ClientId { get; set; } - - [ProtoMember(4)] - public long ClientClock { get; set; } - - [ProtoMember(5)] - public string? ClientState { get; set; } - - [ProtoMember(6)] - public byte[]? Data { get; set; } - - public int EstimateSize() - { - var size = - GuidLength + - sizeof(long) + - sizeof(long) + - DocumentName.Length + - Data?.Length ?? 0; - - return size; - } -} diff --git a/YDotNet.Server.Clustering/RedisConnection.cs b/YDotNet.Server.Clustering/RedisConnection.cs deleted file mode 100644 index 452bbab9..00000000 --- a/YDotNet.Server.Clustering/RedisConnection.cs +++ /dev/null @@ -1,24 +0,0 @@ -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using StackExchange.Redis; -using YDotNet.Server.Clustering.Internal; - -namespace YDotNet.Server.Clustering; - -public sealed class RedisConnection : IDisposable -{ - public Task Instance { get; } - - public RedisConnection(IOptions options, ILogger logger) - { - Instance = options.Value.ConnectAsync(new LoggerTextWriter(logger)); - } - - public void Dispose() - { - if (Instance.IsCompletedSuccessfully) - { - Instance.Result.Close(); - } - } -} diff --git a/YDotNet.Server.Clustering/RedisDocumentStorage.cs b/YDotNet.Server.Clustering/RedisDocumentStorage.cs deleted file mode 100644 index fc77f59f..00000000 --- a/YDotNet.Server.Clustering/RedisDocumentStorage.cs +++ /dev/null @@ -1,60 +0,0 @@ -using Microsoft.Extensions.Options; -using StackExchange.Redis; -using YDotNet.Server.Storage; - -namespace YDotNet.Server.Clustering; - -public sealed class RedisDocumentStorage : IDocumentStorage -{ - private readonly RedisDocumentStorageOptions redisOptions; - private IDatabase? database; - - public RedisDocumentStorage(IOptions redisOptions, RedisConnection redisConnection) - { - this.redisOptions = redisOptions.Value; - - _ = InitializeAsync(redisConnection); - } - - private async Task InitializeAsync(RedisConnection redisConnection) - { - // Use a single task, so that the ordering of registrations does not matter. - var connection = await redisConnection.Instance; - - database = connection.GetDatabase(redisOptions.Database); - } - - public async ValueTask GetDocAsync(string name, - CancellationToken ct = default) - { - if (database == null) - { - return null; - } - - var item = await database.StringGetAsync(Key(name)); - - if (item == RedisValue.Null) - { - return null; - } - - return item; - } - - public async ValueTask StoreDocAsync(string name, byte[] doc, - CancellationToken ct = default) - { - if (database == null) - { - return; - } - - await database.StringSetAsync(Key(name), doc, redisOptions.Expiration?.Invoke(name)); - } - - private string Key(string name) - { - return redisOptions.Prefix + name; - } -} diff --git a/YDotNet.Server.Clustering/RedisDocumentStorageOptions.cs b/YDotNet.Server.Clustering/RedisDocumentStorageOptions.cs deleted file mode 100644 index 6e52e820..00000000 --- a/YDotNet.Server.Clustering/RedisDocumentStorageOptions.cs +++ /dev/null @@ -1,10 +0,0 @@ -namespace YDotNet.Server.Clustering; - -public sealed class RedisDocumentStorageOptions -{ - public Func? Expiration { get; set; } - - public int Database { get; set; } - - public string Prefix { get; set; } = "YDotNetDocument_"; -} diff --git a/YDotNet.Server.Clustering/RedisOptions.cs b/YDotNet.Server.Clustering/RedisOptions.cs deleted file mode 100644 index 11e56ca2..00000000 --- a/YDotNet.Server.Clustering/RedisOptions.cs +++ /dev/null @@ -1,25 +0,0 @@ -using StackExchange.Redis; - -namespace YDotNet.Server.Clustering; - -public sealed class RedisOptions -{ - public ConfigurationOptions? Configuration { get; set; } - - public Func>? ConnectionFactory { get; set; } - - internal async Task ConnectAsync(TextWriter log) - { - if (ConnectionFactory != null) - { - return await ConnectionFactory(log); - } - - if (Configuration != null) - { - return await ConnectionMultiplexer.ConnectAsync(Configuration, log); - } - - throw new InvalidOperationException("Either configuration or connection factory must be set."); - } -} diff --git a/YDotNet.Server.Clustering/ServiceExtensions.cs b/YDotNet.Server.Clustering/ServiceExtensions.cs deleted file mode 100644 index a3586aef..00000000 --- a/YDotNet.Server.Clustering/ServiceExtensions.cs +++ /dev/null @@ -1,32 +0,0 @@ -using YDotNet.Server; -using YDotNet.Server.Clustering; -using YDotNet.Server.Storage; - -namespace Microsoft.Extensions.DependencyInjection; - -public static class ServiceExtensions -{ - public static YDotnetRegistration AddRedis(this YDotnetRegistration registration, Action? configure = null) - { - registration.Services.Configure(configure ?? (x => { })); - registration.Services.AddSingleton(); - - return registration; - } - - public static YDotnetRegistration AddRedisClustering(this YDotnetRegistration registration, Action? configure = null) - { - registration.Services.Configure(configure ?? (x => { })); - registration.Services.AddSingleton(); - - return registration; - } - - public static YDotnetRegistration AddRedisStorage(this YDotnetRegistration registration, Action? configure = null) - { - registration.Services.Configure(configure ?? (x => { })); - registration.Services.AddSingleton(); - - return registration; - } -} diff --git a/YDotNet.Server.Clustering/YDotNet.Server.Clustering.csproj b/YDotNet.Server.Clustering/YDotNet.Server.Clustering.csproj deleted file mode 100644 index a1d898ca..00000000 --- a/YDotNet.Server.Clustering/YDotNet.Server.Clustering.csproj +++ /dev/null @@ -1,19 +0,0 @@ - - - - net7.0 - enable - enable - - - - - - - - - - - - -