Skip to content

Commit

Permalink
feat: Rpc streaming improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
alexyakunin committed Sep 12, 2023
1 parent 4ddd131 commit d341a45
Show file tree
Hide file tree
Showing 17 changed files with 228 additions and 138 deletions.
2 changes: 1 addition & 1 deletion src/Stl.Rpc/Infrastructure/RpcCallTrackers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public RpcInboundCall GetOrRegister(RpcInboundCall call)

public sealed class RpcOutboundCallTracker : RpcCallTracker<RpcOutboundCall>
{
public static readonly TimeSpan AbortCheckPeriod = TimeSpan.FromSeconds(1);
public static TimeSpan AbortCheckPeriod { get; set; } = TimeSpan.FromSeconds(1);

private long _lastId;

Expand Down
18 changes: 8 additions & 10 deletions src/Stl.Rpc/Infrastructure/RpcObjectTrackers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ namespace Stl.Rpc.Infrastructure;

public abstract class RpcObjectTracker
{
public static TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(15);

private RpcPeer _peer = null!;

public RpcPeer Peer {
Expand All @@ -27,7 +25,8 @@ public virtual void Initialize(RpcPeer peer)

public class RpcRemoteObjectTracker : RpcObjectTracker, IEnumerable<IRpcObject>
{
public static readonly GCHandlePool GCHandlePool = new(new GCHandlePool.Options() {
public static TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(15);
public static GCHandlePool GCHandlePool { get; set; } = new(new GCHandlePool.Options() {
Capacity = HardwareInfo.GetProcessorCountPo2Factor(16),
});

Expand Down Expand Up @@ -130,7 +129,9 @@ private long[] GetAliveObjectIdsAndReleaseDeadHandles()

public sealed class RpcSharedObjectTracker : RpcObjectTracker, IEnumerable<IRpcSharedObject>
{
public static readonly TimeSpan AbortCheckPeriod = TimeSpan.FromSeconds(1);
public static TimeSpan ReleasePeriod { get; set; } = TimeSpan.FromSeconds(10);
public static TimeSpan ReleaseTimeout { get; set; }= TimeSpan.FromSeconds(65);
public static TimeSpan AbortCheckPeriod { get; set; } = TimeSpan.FromSeconds(1);

private long _lastId;
private readonly ConcurrentDictionary<long, IRpcSharedObject> _objects = new();
Expand Down Expand Up @@ -166,15 +167,12 @@ public async Task Maintain(CancellationToken cancellationToken)
try {
var hub = Peer.Hub;
var clock = hub.Clock;
var halfKeepAlivePeriod = KeepAlivePeriod.Multiply(0.5);
var keepAliveTimeout = KeepAlivePeriod.Multiply(2.1);
await clock.Delay(halfKeepAlivePeriod, cancellationToken).ConfigureAwait(false);
while (true) {
var minLastKeepAliveAt = CpuTimestamp.Now - keepAliveTimeout;
await clock.Delay(ReleasePeriod, cancellationToken).ConfigureAwait(false);
var minLastKeepAliveAt = CpuTimestamp.Now - ReleaseTimeout;
foreach (var (_, obj) in _objects)
if (obj.LastKeepAliveAt < minLastKeepAliveAt && Unregister(obj))
TryDispose(obj);
await clock.Delay(KeepAlivePeriod, cancellationToken).ConfigureAwait(false);
}
}
catch {
Expand Down Expand Up @@ -219,7 +217,7 @@ public async Task<int> Abort(Exception error)

// Private methods

public void TryDispose(IRpcSharedObject obj)
public static void TryDispose(IRpcSharedObject obj)
{
if (obj is IAsyncDisposable ad)
_ = ad.DisposeAsync();
Expand Down
34 changes: 22 additions & 12 deletions src/Stl.Rpc/Infrastructure/RpcSharedStream.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
using System.Diagnostics;
using Stl.Channels;
using Stl.Rpc.Internal;

namespace Stl.Rpc.Infrastructure;

#pragma warning disable MA0042

public abstract class RpcSharedStream(RpcStream stream) : WorkerBase, IRpcSharedObject
{
protected static readonly Exception NoError = new();
Expand Down Expand Up @@ -44,8 +47,6 @@ public sealed class RpcSharedStream<T>(RpcStream stream) : RpcSharedStream(strea
SingleWriter = true,
});

private long _lastAckIndex;

public new RpcStream<T> Stream { get; } = (RpcStream<T>)stream;

protected override async Task DisposeAsyncCore()
Expand Down Expand Up @@ -86,19 +87,29 @@ protected override async Task OnRun(CancellationToken cancellationToken)
(long NextIndex, bool MustReset) ack;
if (nextAckTask.IsCompleted)
ack = nextAckTask.Result;
else {
else
// Debug.WriteLine("-> Waiting for ACK");
ack = await nextAckTask.ConfigureAwait(false);

// Skip accumulated acknowledgements
while (true) {
// Debug.WriteLine($"-> ACK: {ack}");
if (ack.NextIndex == long.MaxValue)
return; // Client tells us it's done w/ this stream

if (ack.MustReset)
index = ack.NextIndex;

nextAckTask = ackReader.ReadAsync(cancellationToken);
if (!nextAckTask.IsCompleted)
break;

ack = nextAckTask.Result;
}
// Debug.WriteLine($"-> ACK: {ack}");
if (ack.NextIndex == long.MaxValue)
return; // The only point we exit is when the client tells us it's done

nextAckTask = ackReader.ReadAsync(cancellationToken);
// Process the latest acknowledgement
var ackIndex = ack.NextIndex + Stream.AckDistance;
var maxIndex = ack.NextIndex + Stream.AdvanceDistance;
if (ack.MustReset)
index = ack.NextIndex;

// 2. Send as much as we can until we'll need to await for the next acknowledgement
while (index < maxIndex) {
Expand Down Expand Up @@ -167,11 +178,10 @@ private Task SendInvalidPosition(long index, long ackIndex)
private Task Send(long index, long ackIndex, Result<T> item)
{
// Debug.WriteLine($"Sent item: {index}, {ackIndex}");
_lastAckIndex = ackIndex;
if (item.IsValue(out var value))
return _systemCallSender.StreamItem(Peer, Id, index, ackIndex, value);
return _systemCallSender.Item(Peer, Id, index, (int)(ackIndex - index), value);

var error = ReferenceEquals(item.Error, NoError) ? null : item.Error;
return _systemCallSender.StreamEnd(Peer, Id, index, error);
return _systemCallSender.End(Peer, Id, index, error);
}
}
34 changes: 19 additions & 15 deletions src/Stl.Rpc/Infrastructure/RpcSystemCallSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ public sealed class RpcSystemCallSender(IServiceProvider services)
private RpcMethodDef? _notFoundMethodDef;
private RpcMethodDef? _keepAliveMethodDef;
private RpcMethodDef? _missingObjectsMethodDef;
private RpcMethodDef? _streamAckMethodDef;
private RpcMethodDef? _streamItemMethodDef;
private RpcMethodDef? _streamEndMethodDef;
private RpcMethodDef? _ackMethodDef;
private RpcMethodDef? _itemMethodDef;
private RpcMethodDef? _endMethodDef;

public IRpcSystemCalls Client => _client
??= Services.GetRequiredService<IRpcSystemCalls>();
Expand All @@ -33,12 +33,12 @@ public sealed class RpcSystemCallSender(IServiceProvider services)
??= SystemCallsServiceDef.Methods.Single(m => Equals(m.Method.Name, nameof(IRpcSystemCalls.KeepAlive)));
public RpcMethodDef MissingObjectsMethodDef => _missingObjectsMethodDef
??= SystemCallsServiceDef.Methods.Single(m => Equals(m.Method.Name, nameof(IRpcSystemCalls.MissingObjects)));
public RpcMethodDef StreamAckMethodDef => _streamAckMethodDef
??= SystemCallsServiceDef.Methods.Single(m => Equals(m.Method.Name, nameof(IRpcSystemCalls.StreamAck)));
public RpcMethodDef StreamItemMethodDef => _streamItemMethodDef
??= SystemCallsServiceDef.Methods.Single(m => Equals(m.Method.Name, nameof(IRpcSystemCalls.StreamItem)));
public RpcMethodDef StreamEndMethodDef => _streamEndMethodDef
??= SystemCallsServiceDef.Methods.Single(m => Equals(m.Method.Name, nameof(IRpcSystemCalls.StreamEnd)));
public RpcMethodDef AckMethodDef => _ackMethodDef
??= SystemCallsServiceDef.Methods.Single(m => Equals(m.Method.Name, nameof(IRpcSystemCalls.Ack)));
public RpcMethodDef ItemMethodDef => _itemMethodDef
??= SystemCallsServiceDef.Methods.Single(m => Equals(m.Method.Name, nameof(IRpcSystemCalls.I)));
public RpcMethodDef EndMethodDef => _endMethodDef
??= SystemCallsServiceDef.Methods.Single(m => Equals(m.Method.Name, nameof(IRpcSystemCalls.End)));

public Task Complete<TResult>(RpcPeer peer, long callId,
Result<TResult> result, bool allowPolymorphism,
Expand Down Expand Up @@ -91,6 +91,8 @@ public Task Cancel(RpcPeer peer, long callId, List<RpcHeader>? headers = null)
return call.SendNoWait(false);
}

// Objects

public Task KeepAlive(RpcPeer peer, long[] objectIds, List<RpcHeader>? headers = null)
{
var context = new RpcOutboundContext(headers) {
Expand All @@ -109,34 +111,36 @@ public Task MissingObjects(RpcPeer peer, long[] objectIds, List<RpcHeader>? head
return call.SendNoWait(false);
}

public Task StreamAck(RpcPeer peer, long objectId, long nextIndex, bool mustReset, List<RpcHeader>? headers = null)
// Streams

public Task Ack(RpcPeer peer, long objectId, long nextIndex, bool mustReset, List<RpcHeader>? headers = null)
{
var context = new RpcOutboundContext(headers) {
Peer = peer,
RelatedCallId = objectId,
};
var call = context.PrepareCall(StreamAckMethodDef, ArgumentList.New(nextIndex, mustReset))!;
var call = context.PrepareCall(AckMethodDef, ArgumentList.New(nextIndex, mustReset))!;
return call.SendNoWait(false);
}

public Task StreamItem<TItem>(RpcPeer peer, long objectId, long index, long ackIndex, TItem result, List<RpcHeader>? headers = null)
public Task Item<TItem>(RpcPeer peer, long objectId, long index, int ackOffset, TItem result, List<RpcHeader>? headers = null)
{
var context = new RpcOutboundContext(headers) {
Peer = peer,
RelatedCallId = objectId,
};
var call = context.PrepareCall(StreamItemMethodDef, ArgumentList.New(index, ackIndex, result))!;
var call = context.PrepareCall(ItemMethodDef, ArgumentList.New(index, ackOffset, result))!;
return call.SendNoWait(true);
}

public Task StreamEnd(RpcPeer peer, long objectId, long index, Exception? error, List<RpcHeader>? headers = null)
public Task End(RpcPeer peer, long objectId, long index, Exception? error, List<RpcHeader>? headers = null)
{
var context = new RpcOutboundContext(headers) {
Peer = peer,
RelatedCallId = objectId,
};
// An optimized version of Client.Error(result):
var call = context.PrepareCall(StreamEndMethodDef, ArgumentList.New(index, error.ToExceptionInfo()))!;
var call = context.PrepareCall(EndMethodDef, ArgumentList.New(index, error.ToExceptionInfo()))!;
return call.SendNoWait(false);
}
}
16 changes: 8 additions & 8 deletions src/Stl.Rpc/Infrastructure/RpcSystemCalls.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ public interface IRpcSystemCalls : IRpcSystemService
Task<RpcNoWait> MissingObjects(long[] objectIds);

// Streams
Task<RpcNoWait> StreamAck(long nextIndex, bool mustReset);
Task<RpcNoWait> StreamItem(long index, long ackIndex, object? item);
Task<RpcNoWait> StreamEnd(long index, ExceptionInfo error);
Task<RpcNoWait> Ack(long nextIndex, bool mustReset);
Task<RpcNoWait> I(long index, int ackOffset, object? item);
Task<RpcNoWait> End(long index, ExceptionInfo error);
}

public class RpcSystemCalls(IServiceProvider services)
: RpcServiceBase(services), IRpcSystemCalls, IRpcDynamicCallHandler
{
private static readonly Symbol OkMethodName = nameof(Ok);
private static readonly Symbol StreamItemMethodName = nameof(StreamItem);
private static readonly Symbol StreamItemMethodName = nameof(I);

public static readonly Symbol Name = "$sys";

Expand Down Expand Up @@ -77,7 +77,7 @@ public Task<RpcNoWait> MissingObjects(long[] objectIds)
return RpcNoWait.Tasks.Completed;
}

public async Task<RpcNoWait> StreamAck(long nextIndex, bool mustReset)
public async Task<RpcNoWait> Ack(long nextIndex, bool mustReset)
{
var context = RpcInboundContext.GetCurrent();
var peer = context.Peer;
Expand All @@ -89,17 +89,17 @@ public async Task<RpcNoWait> StreamAck(long nextIndex, bool mustReset)
return default;
}

public Task<RpcNoWait> StreamItem(long index, long ackIndex, object? item)
public Task<RpcNoWait> I(long index, int ackOffset, object? item)
{
var context = RpcInboundContext.GetCurrent();
var peer = context.Peer;
var objectId = context.Message.RelatedId;
return peer.RemoteObjects.Get(objectId) is RpcStream stream
? RpcNoWait.Tasks.From(stream.OnItem(index, ackIndex, item))
? RpcNoWait.Tasks.From(stream.OnItem(index, index + ackOffset, item))
: RpcNoWait.Tasks.Completed;
}

public Task<RpcNoWait> StreamEnd(long index, ExceptionInfo error)
public Task<RpcNoWait> End(long index, ExceptionInfo error)
{
var context = RpcInboundContext.GetCurrent();
var peer = context.Peer;
Expand Down
4 changes: 2 additions & 2 deletions src/Stl.Rpc/RpcStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ internal IAsyncEnumerable<T> GetLocalSource()
}

protected internal override ArgumentList CreateStreamItemArguments()
=> ArgumentList.New(0L, 0L, default(T));
=> ArgumentList.New<long, int, T>(0L, 0, default!);

protected internal override Task OnItem(long index, long ackIndex, object? item)
{
Expand Down Expand Up @@ -224,7 +224,7 @@ private Task SendAck(long index, bool mustReset = false)

// Debug.WriteLine($"ACK: ({index}, {mustReset})");
var peer = Peer!;
return peer.Hub.SystemCallSender.StreamAck(peer, Id, index, mustReset);
return peer.Hub.SystemCallSender.Ack(peer, Id, index, mustReset);
}

// Nested types
Expand Down
2 changes: 1 addition & 1 deletion src/Stl.Testing/TestChannelPair.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public TestChannelPair(string name, ITestOutputHelper? @out = null, int capacity
Out.WriteLine($"{Name}.Channel2 -> {m}");
return m;
},
ChannelCompletionMode.Full
ChannelCopyMode.CopyAllSilently
);
Channel1 = cp1.Channel1;
Channel2 = cp2.Channel2;
Expand Down
10 changes: 0 additions & 10 deletions src/Stl/Channels/ChannelCompletionMode.cs

This file was deleted.

12 changes: 12 additions & 0 deletions src/Stl/Channels/ChannelCopyMode.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace Stl.Channels;

[Flags]
public enum ChannelCopyMode
{
CopyCompletion = 1,
CopyError = 2,
CopyCancellation = 4,
CopyAll = CopyCompletion + CopyError + CopyCancellation,
Silently = 64,
CopyAllSilently = CopyAll + Silently,
}
Loading

0 comments on commit d341a45

Please sign in to comment.