Skip to content

Commit

Permalink
fix: abort streams on (re)connection to another hub
Browse files Browse the repository at this point in the history
  • Loading branch information
alexyakunin committed Sep 12, 2023
1 parent afb2274 commit 34cce9e
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 23 deletions.
3 changes: 2 additions & 1 deletion src/Stl.Fusion/Operations/AgentInfo.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Globalization;
using Stl.OS;

namespace Stl.Fusion.Operations;
Expand All @@ -8,6 +9,6 @@ public record AgentInfo(Symbol Id)
private static long GetNextId() => Interlocked.Increment(ref _nextId);

public AgentInfo()
: this($"{RuntimeInfo.Process.MachinePrefixedId}-{GetNextId()}")
: this($"{RuntimeInfo.Process.MachinePrefixedId.Value}-{GetNextId().ToString(CultureInfo.InvariantCulture)}")
{ }
}
21 changes: 15 additions & 6 deletions src/Stl.Rpc/Infrastructure/RpcSharedStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ void IRpcObject.OnMissing()
public void OnKeepAlive()
=> LastKeepAliveAt = CpuTimestamp.Now;

public abstract Task OnAck(long nextIndex, bool mustReset);
public abstract Task OnAck(long nextIndex, string? resetKey);
}

public sealed class RpcSharedStream<T>(RpcStream stream) : RpcSharedStream(stream)
Expand All @@ -59,13 +59,22 @@ protected override async Task DisposeAsyncCore()
}
}

public override Task OnAck(long nextIndex, bool mustReset)
public override Task OnAck(long nextIndex, string? resetKey)
{
var mustReset = !ReferenceEquals(resetKey, null);
if (mustReset && !Equals(Stream.ResetKey, resetKey))
return SendNotFound(nextIndex, nextIndex);

LastKeepAliveAt = CpuTimestamp.Now;
lock (Lock) {
LastKeepAliveAt = CpuTimestamp.Now;
if (WhenRunning == null)
this.Start();
else if (WhenRunning.IsCompleted)
var whenRunning = WhenRunning;
if (whenRunning == null) {
if (mustReset && nextIndex == 0)
this.Start();
else
return SendNotFound(nextIndex, nextIndex);
}
else if (whenRunning.IsCompleted)
return SendNotFound(nextIndex, nextIndex);

_acks.Writer.TryWrite((nextIndex, mustReset)); // Must always succeed for unbounded channel
Expand Down
4 changes: 2 additions & 2 deletions src/Stl.Rpc/Infrastructure/RpcSystemCallSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ public Task MissingObjects(RpcPeer peer, long[] objectIds, List<RpcHeader>? head

// Streams

public Task Ack(RpcPeer peer, long objectId, long nextIndex, bool mustReset, List<RpcHeader>? headers = null)
public Task Ack(RpcPeer peer, long objectId, long nextIndex, string? resetKey, List<RpcHeader>? headers = null)
{
var context = new RpcOutboundContext(headers) {
Peer = peer,
RelatedCallId = objectId,
};
var call = context.PrepareCall(AckMethodDef, ArgumentList.New(nextIndex, mustReset))!;
var call = context.PrepareCall(AckMethodDef, ArgumentList.New(nextIndex, resetKey))!;
return call.SendNoWait(false);
}

Expand Down
6 changes: 3 additions & 3 deletions src/Stl.Rpc/Infrastructure/RpcSystemCalls.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public interface IRpcSystemCalls : IRpcSystemService
Task<RpcNoWait> MissingObjects(long[] objectIds);

// Streams
Task<RpcNoWait> Ack(long nextIndex, bool mustReset);
Task<RpcNoWait> Ack(long nextIndex, string? resetKey);
Task<RpcNoWait> I(long index, int ackOffset, object? item);
Task<RpcNoWait> End(long index, ExceptionInfo error);
}
Expand Down Expand Up @@ -77,13 +77,13 @@ public Task<RpcNoWait> MissingObjects(long[] objectIds)
return RpcNoWait.Tasks.Completed;
}

public async Task<RpcNoWait> Ack(long nextIndex, bool mustReset)
public async Task<RpcNoWait> Ack(long nextIndex, string? resetKey)
{
var context = RpcInboundContext.GetCurrent();
var peer = context.Peer;
var objectId = context.Message.RelatedId;
if (peer.SharedObjects.Get(objectId) is RpcSharedStream stream)
await stream.OnAck(nextIndex, mustReset).ConfigureAwait(false);
await stream.OnAck(nextIndex, resetKey).ConfigureAwait(false);
else
await peer.Hub.SystemCallSender.MissingObjects(peer, new[] { objectId }).ConfigureAwait(false);
return default;
Expand Down
15 changes: 14 additions & 1 deletion src/Stl.Rpc/RpcHub.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
using System.Globalization;
using Stl.OS;
using Stl.Rpc.Infrastructure;
using Stl.Rpc.Internal;
using Errors = Stl.Internal.Errors;

namespace Stl.Rpc;

public sealed class RpcHub : ProcessorBase, IHasServices
public sealed class RpcHub : ProcessorBase, IHasServices, IHasId<Symbol>
{
private static long _lastId;

private RpcServiceRegistry? _serviceRegistry;
private IEnumerable<RpcPeerTracker>? _peerTrackers;
private RpcSystemCallSender? _systemCallSender;
Expand All @@ -32,6 +36,7 @@ public sealed class RpcHub : ProcessorBase, IHasServices

internal ConcurrentDictionary<RpcPeerRef, RpcPeer> Peers { get; } = new();

public Symbol Id { get; init; } = NextId();
public IServiceProvider Services { get; }
public RpcConfiguration Configuration { get; }
public RpcServiceRegistry ServiceRegistry => _serviceRegistry ??= Services.GetRequiredService<RpcServiceRegistry>();
Expand Down Expand Up @@ -90,4 +95,12 @@ public RpcClientPeer GetClientPeer(RpcPeerRef peerRef)

public RpcServerPeer GetServerPeer(RpcPeerRef peerRef)
=> (RpcServerPeer)GetPeer(peerRef.RequireServer());

// Private methods

private static Symbol NextId()
{
var id = Interlocked.Increment(ref _lastId);
return $"{RuntimeInfo.Process.MachinePrefixedId.Value}-{id.ToString(CultureInfo.InvariantCulture)}";
}
}
54 changes: 44 additions & 10 deletions src/Stl.Rpc/RpcStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ public abstract partial class RpcStream : IRpcObject
AllowSynchronousContinuations = false, // We don't want sync handlers to "clog" the call processing loop
};

[DataMember, MemoryPackOrder(1)]
public int AckDistance { get; init; } = 30;
[DataMember, MemoryPackOrder(2)]
public int AckDistance { get; init; } = 30;
[DataMember, MemoryPackOrder(3)]
public int AdvanceDistance { get; init; } = 61;

// Non-serialized members
[JsonIgnore, MemoryPackIgnore] public long Id { get; protected set; }
[JsonIgnore, MemoryPackIgnore] public string? ResetKey { get; protected set; }
[JsonIgnore, MemoryPackIgnore] public RpcPeer? Peer { get; protected set; }
[JsonIgnore, MemoryPackIgnore] public abstract Type ItemType { get; }
[JsonIgnore, MemoryPackIgnore] public abstract RpcObjectKind Kind { get; }
Expand Down Expand Up @@ -69,16 +70,22 @@ public long SerializedId {
if (Id > 0) // Already registered
return Id;

Peer = RpcOutboundContext.Current?.Peer ?? RpcInboundContext.GetCurrent().Peer;
Peer ??= RpcOutboundContext.Current?.Peer ?? RpcInboundContext.GetCurrent().Peer;
var sharedObjects = Peer.SharedObjects;
Id = sharedObjects.NextId(); // NOTE: Id changes on serialization!
var sharedStream = new RpcSharedStream<T>(this);
sharedObjects.Register(sharedStream);
return Id;
}
set {
this.RequireKind(RpcObjectKind.Remote);
lock (_lock) {
this.RequireKind(RpcObjectKind.Remote);
if (Id != 0) {
if (Id == value)
return;
throw Errors.AlreadyInitialized(nameof(SerializedId));
}

Id = value;
Peer = RpcInboundContext.GetCurrent().Peer;
_isRemoteObjectRegistered = true;
Expand All @@ -87,6 +94,30 @@ public long SerializedId {
}
}

[DataMember, MemoryPackOrder(1)]
public string SerializedResetKey {
get {
// This member must be never accessed directly - its only purpose is to be called on serialization
this.RequireKind(RpcObjectKind.Local);
if (!ReferenceEquals(ResetKey, null)) // Already acquired
return ResetKey;

Peer ??= RpcOutboundContext.Current?.Peer ?? RpcInboundContext.GetCurrent().Peer;
return ResetKey = Peer.Hub.Id.Value;
}
set {
this.RequireKind(RpcObjectKind.Remote);
lock (_lock) {
if (!ReferenceEquals(ResetKey, null)) {
if (Equals(ResetKey, value))
return;
throw Errors.AlreadyInitialized(nameof(SerializedResetKey));
}
ResetKey = value;
}
}
}

[JsonIgnore] public override Type ItemType => typeof(T);
[JsonIgnore] public override RpcObjectKind Kind
=> _localSource != null ? RpcObjectKind.Local : RpcObjectKind.Remote;
Expand Down Expand Up @@ -141,7 +172,7 @@ protected internal override Task OnItem(long index, long ackIndex, object? item)
return Task.CompletedTask;

if (index > _nextIndex)
return SendAckFromLock(_nextIndex, true);
return SendResetFromLock(_nextIndex);

// Debug.WriteLine($"{Id}: +#{index} (ack @ {ackIndex})");
_nextIndex++;
Expand All @@ -161,7 +192,7 @@ protected internal override Task OnEnd(long index, Exception? error)
return Task.CompletedTask;

if (index > _nextIndex)
return SendAckFromLock(_nextIndex, true);
return SendResetFromLock(_nextIndex);

// Debug.WriteLine($"{Id}: +{index} (ended!)");
CloseFromLock(error);
Expand All @@ -172,7 +203,7 @@ protected internal override Task OnEnd(long index, Exception? error)
protected override Task OnReconnected(CancellationToken cancellationToken)
{
lock (_lock)
return _remoteChannel != null ? SendAckFromLock(_nextIndex, true) : Task.CompletedTask;
return _remoteChannel != null ? SendResetFromLock(_nextIndex) : Task.CompletedTask;
}

protected override void OnMissing()
Expand Down Expand Up @@ -210,14 +241,17 @@ private void CloseFromLock(Exception? error)
private Task SendCloseFromLock()
{
_nextIndex = int.MaxValue;
return SendAckFromLock(_nextIndex);
return SendAckFromLock(_nextIndex, true);
}

private Task SendResetFromLock(long index)
=> SendAckFromLock(index, true);

private Task SendAckFromLock(long index, bool mustReset = false)
{
// Debug.WriteLine($"{Id}: <- ACK: ({index}, {mustReset})");
return !_isMissing
? Peer!.Hub.SystemCallSender.Ack(Peer, Id, index, mustReset)
? Peer!.Hub.SystemCallSender.Ack(Peer, Id, index, mustReset ? ResetKey ?? "" : null)
: Task.CompletedTask;
}

Expand Down Expand Up @@ -273,7 +307,7 @@ async ValueTask<bool> MoveNext()
try {
if (!_isStarted) {
_isStarted = true;
await _stream.SendAckFromLock(0).ConfigureAwait(false);
await _stream.SendResetFromLock(0).ConfigureAwait(false);
}
if (!await _reader.WaitToReadAsync(_cancellationToken).ConfigureAwait(false)) {
_isEnded = true;
Expand Down

0 comments on commit 34cce9e

Please sign in to comment.