From 34cce9ef420e241f96eb57ada5c1df0277bca7dc Mon Sep 17 00:00:00 2001 From: Alex Yakunin Date: Tue, 12 Sep 2023 06:04:39 -0700 Subject: [PATCH] fix: abort streams on (re)connection to another hub --- src/Stl.Fusion/Operations/AgentInfo.cs | 3 +- src/Stl.Rpc/Infrastructure/RpcSharedStream.cs | 21 +++++--- .../Infrastructure/RpcSystemCallSender.cs | 4 +- src/Stl.Rpc/Infrastructure/RpcSystemCalls.cs | 6 +-- src/Stl.Rpc/RpcHub.cs | 15 +++++- src/Stl.Rpc/RpcStream.cs | 54 +++++++++++++++---- 6 files changed, 80 insertions(+), 23 deletions(-) diff --git a/src/Stl.Fusion/Operations/AgentInfo.cs b/src/Stl.Fusion/Operations/AgentInfo.cs index e142920c2..d888f6216 100644 --- a/src/Stl.Fusion/Operations/AgentInfo.cs +++ b/src/Stl.Fusion/Operations/AgentInfo.cs @@ -1,3 +1,4 @@ +using System.Globalization; using Stl.OS; namespace Stl.Fusion.Operations; @@ -8,6 +9,6 @@ public record AgentInfo(Symbol Id) private static long GetNextId() => Interlocked.Increment(ref _nextId); public AgentInfo() - : this($"{RuntimeInfo.Process.MachinePrefixedId}-{GetNextId()}") + : this($"{RuntimeInfo.Process.MachinePrefixedId.Value}-{GetNextId().ToString(CultureInfo.InvariantCulture)}") { } } diff --git a/src/Stl.Rpc/Infrastructure/RpcSharedStream.cs b/src/Stl.Rpc/Infrastructure/RpcSharedStream.cs index fe61a2446..f4feaa709 100644 --- a/src/Stl.Rpc/Infrastructure/RpcSharedStream.cs +++ b/src/Stl.Rpc/Infrastructure/RpcSharedStream.cs @@ -34,7 +34,7 @@ void IRpcObject.OnMissing() public void OnKeepAlive() => LastKeepAliveAt = CpuTimestamp.Now; - public abstract Task OnAck(long nextIndex, bool mustReset); + public abstract Task OnAck(long nextIndex, string? resetKey); } public sealed class RpcSharedStream(RpcStream stream) : RpcSharedStream(stream) @@ -59,13 +59,22 @@ protected override async Task DisposeAsyncCore() } } - public override Task OnAck(long nextIndex, bool mustReset) + public override Task OnAck(long nextIndex, string? resetKey) { + var mustReset = !ReferenceEquals(resetKey, null); + if (mustReset && !Equals(Stream.ResetKey, resetKey)) + return SendNotFound(nextIndex, nextIndex); + + LastKeepAliveAt = CpuTimestamp.Now; lock (Lock) { - LastKeepAliveAt = CpuTimestamp.Now; - if (WhenRunning == null) - this.Start(); - else if (WhenRunning.IsCompleted) + var whenRunning = WhenRunning; + if (whenRunning == null) { + if (mustReset && nextIndex == 0) + this.Start(); + else + return SendNotFound(nextIndex, nextIndex); + } + else if (whenRunning.IsCompleted) return SendNotFound(nextIndex, nextIndex); _acks.Writer.TryWrite((nextIndex, mustReset)); // Must always succeed for unbounded channel diff --git a/src/Stl.Rpc/Infrastructure/RpcSystemCallSender.cs b/src/Stl.Rpc/Infrastructure/RpcSystemCallSender.cs index df7f907a4..02824265c 100644 --- a/src/Stl.Rpc/Infrastructure/RpcSystemCallSender.cs +++ b/src/Stl.Rpc/Infrastructure/RpcSystemCallSender.cs @@ -113,13 +113,13 @@ public Task MissingObjects(RpcPeer peer, long[] objectIds, List? head // Streams - public Task Ack(RpcPeer peer, long objectId, long nextIndex, bool mustReset, List? headers = null) + public Task Ack(RpcPeer peer, long objectId, long nextIndex, string? resetKey, List? headers = null) { var context = new RpcOutboundContext(headers) { Peer = peer, RelatedCallId = objectId, }; - var call = context.PrepareCall(AckMethodDef, ArgumentList.New(nextIndex, mustReset))!; + var call = context.PrepareCall(AckMethodDef, ArgumentList.New(nextIndex, resetKey))!; return call.SendNoWait(false); } diff --git a/src/Stl.Rpc/Infrastructure/RpcSystemCalls.cs b/src/Stl.Rpc/Infrastructure/RpcSystemCalls.cs index a184041d8..3170c0376 100644 --- a/src/Stl.Rpc/Infrastructure/RpcSystemCalls.cs +++ b/src/Stl.Rpc/Infrastructure/RpcSystemCalls.cs @@ -16,7 +16,7 @@ public interface IRpcSystemCalls : IRpcSystemService Task MissingObjects(long[] objectIds); // Streams - Task Ack(long nextIndex, bool mustReset); + Task Ack(long nextIndex, string? resetKey); Task I(long index, int ackOffset, object? item); Task End(long index, ExceptionInfo error); } @@ -77,13 +77,13 @@ public Task MissingObjects(long[] objectIds) return RpcNoWait.Tasks.Completed; } - public async Task Ack(long nextIndex, bool mustReset) + public async Task Ack(long nextIndex, string? resetKey) { var context = RpcInboundContext.GetCurrent(); var peer = context.Peer; var objectId = context.Message.RelatedId; if (peer.SharedObjects.Get(objectId) is RpcSharedStream stream) - await stream.OnAck(nextIndex, mustReset).ConfigureAwait(false); + await stream.OnAck(nextIndex, resetKey).ConfigureAwait(false); else await peer.Hub.SystemCallSender.MissingObjects(peer, new[] { objectId }).ConfigureAwait(false); return default; diff --git a/src/Stl.Rpc/RpcHub.cs b/src/Stl.Rpc/RpcHub.cs index e8e1ac2ff..9b6c9b736 100644 --- a/src/Stl.Rpc/RpcHub.cs +++ b/src/Stl.Rpc/RpcHub.cs @@ -1,11 +1,15 @@ +using System.Globalization; +using Stl.OS; using Stl.Rpc.Infrastructure; using Stl.Rpc.Internal; using Errors = Stl.Internal.Errors; namespace Stl.Rpc; -public sealed class RpcHub : ProcessorBase, IHasServices +public sealed class RpcHub : ProcessorBase, IHasServices, IHasId { + private static long _lastId; + private RpcServiceRegistry? _serviceRegistry; private IEnumerable? _peerTrackers; private RpcSystemCallSender? _systemCallSender; @@ -32,6 +36,7 @@ public sealed class RpcHub : ProcessorBase, IHasServices internal ConcurrentDictionary Peers { get; } = new(); + public Symbol Id { get; init; } = NextId(); public IServiceProvider Services { get; } public RpcConfiguration Configuration { get; } public RpcServiceRegistry ServiceRegistry => _serviceRegistry ??= Services.GetRequiredService(); @@ -90,4 +95,12 @@ public RpcClientPeer GetClientPeer(RpcPeerRef peerRef) public RpcServerPeer GetServerPeer(RpcPeerRef peerRef) => (RpcServerPeer)GetPeer(peerRef.RequireServer()); + + // Private methods + + private static Symbol NextId() + { + var id = Interlocked.Increment(ref _lastId); + return $"{RuntimeInfo.Process.MachinePrefixedId.Value}-{id.ToString(CultureInfo.InvariantCulture)}"; + } } diff --git a/src/Stl.Rpc/RpcStream.cs b/src/Stl.Rpc/RpcStream.cs index d0fa8ee0b..ffa03746d 100644 --- a/src/Stl.Rpc/RpcStream.cs +++ b/src/Stl.Rpc/RpcStream.cs @@ -17,13 +17,14 @@ public abstract partial class RpcStream : IRpcObject AllowSynchronousContinuations = false, // We don't want sync handlers to "clog" the call processing loop }; - [DataMember, MemoryPackOrder(1)] - public int AckDistance { get; init; } = 30; [DataMember, MemoryPackOrder(2)] + public int AckDistance { get; init; } = 30; + [DataMember, MemoryPackOrder(3)] public int AdvanceDistance { get; init; } = 61; // Non-serialized members [JsonIgnore, MemoryPackIgnore] public long Id { get; protected set; } + [JsonIgnore, MemoryPackIgnore] public string? ResetKey { get; protected set; } [JsonIgnore, MemoryPackIgnore] public RpcPeer? Peer { get; protected set; } [JsonIgnore, MemoryPackIgnore] public abstract Type ItemType { get; } [JsonIgnore, MemoryPackIgnore] public abstract RpcObjectKind Kind { get; } @@ -69,7 +70,7 @@ public long SerializedId { if (Id > 0) // Already registered return Id; - Peer = RpcOutboundContext.Current?.Peer ?? RpcInboundContext.GetCurrent().Peer; + Peer ??= RpcOutboundContext.Current?.Peer ?? RpcInboundContext.GetCurrent().Peer; var sharedObjects = Peer.SharedObjects; Id = sharedObjects.NextId(); // NOTE: Id changes on serialization! var sharedStream = new RpcSharedStream(this); @@ -77,8 +78,14 @@ public long SerializedId { return Id; } set { + this.RequireKind(RpcObjectKind.Remote); lock (_lock) { - this.RequireKind(RpcObjectKind.Remote); + if (Id != 0) { + if (Id == value) + return; + throw Errors.AlreadyInitialized(nameof(SerializedId)); + } + Id = value; Peer = RpcInboundContext.GetCurrent().Peer; _isRemoteObjectRegistered = true; @@ -87,6 +94,30 @@ public long SerializedId { } } + [DataMember, MemoryPackOrder(1)] + public string SerializedResetKey { + get { + // This member must be never accessed directly - its only purpose is to be called on serialization + this.RequireKind(RpcObjectKind.Local); + if (!ReferenceEquals(ResetKey, null)) // Already acquired + return ResetKey; + + Peer ??= RpcOutboundContext.Current?.Peer ?? RpcInboundContext.GetCurrent().Peer; + return ResetKey = Peer.Hub.Id.Value; + } + set { + this.RequireKind(RpcObjectKind.Remote); + lock (_lock) { + if (!ReferenceEquals(ResetKey, null)) { + if (Equals(ResetKey, value)) + return; + throw Errors.AlreadyInitialized(nameof(SerializedResetKey)); + } + ResetKey = value; + } + } + } + [JsonIgnore] public override Type ItemType => typeof(T); [JsonIgnore] public override RpcObjectKind Kind => _localSource != null ? RpcObjectKind.Local : RpcObjectKind.Remote; @@ -141,7 +172,7 @@ protected internal override Task OnItem(long index, long ackIndex, object? item) return Task.CompletedTask; if (index > _nextIndex) - return SendAckFromLock(_nextIndex, true); + return SendResetFromLock(_nextIndex); // Debug.WriteLine($"{Id}: +#{index} (ack @ {ackIndex})"); _nextIndex++; @@ -161,7 +192,7 @@ protected internal override Task OnEnd(long index, Exception? error) return Task.CompletedTask; if (index > _nextIndex) - return SendAckFromLock(_nextIndex, true); + return SendResetFromLock(_nextIndex); // Debug.WriteLine($"{Id}: +{index} (ended!)"); CloseFromLock(error); @@ -172,7 +203,7 @@ protected internal override Task OnEnd(long index, Exception? error) protected override Task OnReconnected(CancellationToken cancellationToken) { lock (_lock) - return _remoteChannel != null ? SendAckFromLock(_nextIndex, true) : Task.CompletedTask; + return _remoteChannel != null ? SendResetFromLock(_nextIndex) : Task.CompletedTask; } protected override void OnMissing() @@ -210,14 +241,17 @@ private void CloseFromLock(Exception? error) private Task SendCloseFromLock() { _nextIndex = int.MaxValue; - return SendAckFromLock(_nextIndex); + return SendAckFromLock(_nextIndex, true); } + private Task SendResetFromLock(long index) + => SendAckFromLock(index, true); + private Task SendAckFromLock(long index, bool mustReset = false) { // Debug.WriteLine($"{Id}: <- ACK: ({index}, {mustReset})"); return !_isMissing - ? Peer!.Hub.SystemCallSender.Ack(Peer, Id, index, mustReset) + ? Peer!.Hub.SystemCallSender.Ack(Peer, Id, index, mustReset ? ResetKey ?? "" : null) : Task.CompletedTask; } @@ -273,7 +307,7 @@ async ValueTask MoveNext() try { if (!_isStarted) { _isStarted = true; - await _stream.SendAckFromLock(0).ConfigureAwait(false); + await _stream.SendResetFromLock(0).ConfigureAwait(false); } if (!await _reader.WaitToReadAsync(_cancellationToken).ConfigureAwait(false)) { _isEnded = true;