Skip to content

Commit

Permalink
feat(port forward): listen on stop signal and report back to client
Browse files Browse the repository at this point in the history
  • Loading branch information
Fredrik Arvidsson committed Aug 13, 2021
1 parent 3e71108 commit edd8903
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 44 deletions.
2 changes: 1 addition & 1 deletion src/Server/IKubernetesService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public interface IKubernetesService
Task<IEnumerable<Service>>
ListServicesInAllNamespacesAsync(string context);

Task<IAsyncDisposable> PortForwardAsync(
Task<IStreamForwarder> PortForwardAsync(
string context,
PortForward portForward,
CancellationToken cancellationToken = default);
Expand Down
12 changes: 12 additions & 0 deletions src/Server/IStreamForwarder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Port.Server
{
public interface IStreamForwarder : IAsyncDisposable
{
Task WaitUntilStoppedAsync(
CancellationToken cancellation);
}
}
4 changes: 2 additions & 2 deletions src/Server/KubernetesService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public async Task<IEnumerable<Service>>
service.Spec.Selector));
}

public async Task<IAsyncDisposable> PortForwardAsync(
public async Task<IStreamForwarder> PortForwardAsync(
string context,
PortForward portForward,
CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -100,7 +100,7 @@ public async Task<IAsyncDisposable> PortForwardAsync(

// The disposables have order dependencies so they need to be
// disposed in the reversed order they where created
return new AsyncDisposables(
return new OrderedDisposableStreamForwarderDecorator(
SpdyStreamForwarder.Start(socketServer, session, portForward),
session,
socketServer);
Expand Down
2 changes: 1 addition & 1 deletion src/Server/Messages/ForwardResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
{
internal sealed partial class ForwardResponse
{
internal static ForwardResponse CreateStopped() => new()
internal static ForwardResponse WasStopped() => new()
{
Stopped = new Stopped()
};
Expand Down
31 changes: 31 additions & 0 deletions src/Server/OrderedDisposableStreamForwarderDecorator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Port.Server
{
internal sealed class OrderedDisposableStreamForwarderDecorator : IStreamForwarder
{
private readonly IStreamForwarder _streamForwarder;
private readonly IAsyncDisposable _disposable;

public OrderedDisposableStreamForwarderDecorator(IStreamForwarder streamForwarder, params IAsyncDisposable[] disposables)
{
_streamForwarder = streamForwarder;
var asyncDisposables = new List<IAsyncDisposable>
{
streamForwarder
};
asyncDisposables.AddRange(disposables);
_disposable = new AsyncDisposables(asyncDisposables.ToArray());
}

public ValueTask DisposeAsync()
=> _disposable.DisposeAsync();

public Task WaitUntilStoppedAsync(
CancellationToken cancellation)
=> _streamForwarder.WaitUntilStoppedAsync(cancellation);
}
}
77 changes: 48 additions & 29 deletions src/Server/Services/PortForwardService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ internal sealed class PortForwardService : PortForwarder.PortForwarderBase,
private readonly IKubernetesService _kubernetesService;
private readonly CancellationTokenSource _cts = new();
private CancellationToken CancellationToken => _cts.Token;
private IAsyncDisposable _portForwardHandler = new AsyncDisposables();

private readonly ConcurrentDictionary<string, IAsyncDisposable>
private readonly ConcurrentDictionary<string, IStreamForwarder>
_portForwardHandlers =
new();

Expand All @@ -31,19 +30,38 @@ public override async Task PortForward(
{
var portForward = command.ToPortForward();

_portForwardHandler =
await _kubernetesService
.PortForwardAsync(
command.Context,
portForward,
CancellationToken)
using var cancellationTokenSource =
CancellationTokenSource.CreateLinkedTokenSource(
CancellationToken, context.CancellationToken);
var cancellationToken = cancellationTokenSource.Token;

try
{
var portForwardHandler =
await _kubernetesService
.PortForwardAsync(
command.Context,
portForward,
cancellationToken)
.ConfigureAwait(false);

_portForwardHandlers.TryAdd(command.GetId(), portForwardHandler);

await responseStream
.WriteAsync(ForwardResponse.WasForwarded())
.ConfigureAwait(false);

_portForwardHandlers.TryAdd(command.GetId(), _portForwardHandler);
await portForwardHandler.WaitUntilStoppedAsync(cancellationToken)
.ConfigureAwait(false);

await responseStream
.WriteAsync(ForwardResponse.WasForwarded())
.ConfigureAwait(false);
await responseStream
.WriteAsync(ForwardResponse.WasStopped())
.ConfigureAwait(false);
}
catch when (cancellationToken.IsCancellationRequested)
{
context.Status = Status.DefaultCancelled;
}
}

public override async Task<Stopped> StopForwarding(
Expand All @@ -62,25 +80,26 @@ await handler.DisposeAsync()

public async ValueTask DisposeAsync()
{
_cts.Cancel();

while (!_portForwardHandlers.IsEmpty)
using (_cts)
{
await Task.WhenAll(
_portForwardHandlers
.Keys
.Select(
key => _portForwardHandlers
.TryRemove(key, out var handler)
? handler.DisposeAsync()
: ValueTask.CompletedTask)
.Where(
valueTask => !valueTask
.IsCompletedSuccessfully)
.Select(valueTask => valueTask.AsTask()));
}
_cts.Cancel();

_cts.Dispose();
while (!_portForwardHandlers.IsEmpty)
{
await Task.WhenAll(
_portForwardHandlers
.Keys
.Select(
key => _portForwardHandlers
.TryRemove(key, out var handler)
? handler.DisposeAsync()
: ValueTask.CompletedTask)
.Where(
valueTask => !valueTask
.IsCompletedSuccessfully)
.Select(valueTask => valueTask.AsTask()));
}
}
}
}
}
25 changes: 14 additions & 11 deletions src/Server/SpdyStreamForwarder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

namespace Port.Server
{
internal sealed class SpdyStreamForwarder : IAsyncDisposable
internal sealed class SpdyStreamForwarder : IStreamForwarder
{
private readonly INetworkServer _networkServer;
private readonly SpdySession _spdySession;
Expand Down Expand Up @@ -43,7 +43,7 @@ private SpdyStreamForwarder(
_portForward = portForward;
}

internal static IAsyncDisposable Start(
internal static IStreamForwarder Start(
INetworkServer networkServer,
SpdySession spdySession,
PortForward portForward)
Expand All @@ -52,7 +52,7 @@ internal static IAsyncDisposable Start(
.Start();
}

private IAsyncDisposable Start()
private IStreamForwarder Start()
{
var previousStatus = Interlocked.Exchange(ref _status, Started);
if (previousStatus == Started)
Expand Down Expand Up @@ -339,19 +339,22 @@ await localSocket

public async ValueTask DisposeAsync()
{
_cancellationTokenSource.Cancel(false);

try
using (_cancellationTokenSource)
{
_cancellationTokenSource.Cancel(false);

await Task.WhenAll(_backgroundTasks)
.ConfigureAwait(false);

}
catch (OperationCanceledException)
{
}
}

_cancellationTokenSource.Dispose();
public Task WaitUntilStoppedAsync(
CancellationToken cancellation)
{
var stopped = new TaskCompletionSource();
CancellationToken.Register(() => stopped.TrySetResult());
cancellation.Register(() => stopped.TrySetCanceled(cancellation));
return stopped.Task;
}
}
}

0 comments on commit edd8903

Please sign in to comment.