Skip to content

Commit

Permalink
Clean up session
Browse files Browse the repository at this point in the history
  • Loading branch information
Havret committed Mar 16, 2024
1 parent 9b2b7d1 commit 7b273b3
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 65 deletions.
1 change: 1 addition & 0 deletions src/ArtemisNetCoreClient/Framing/Codec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public static void Encode(ByteBuffer buffer, Packet packet, long channelId)
CreateSessionMessageV2 => CreateSessionMessageV2.Type,
SessionStart => SessionStart.Type,
SessionStop => SessionStop.Type,
SessionCloseMessage => SessionCloseMessage.Type,
_ => throw new ArgumentOutOfRangeException(nameof(packet), packet, $"{packet.GetType()} is not supported for encoding")
};
buffer.WriteByte(type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,6 @@ internal class CreateSessionResponseMessage : Packet
{
public const byte Type = 31;
public int ServerVersion { get; private set; }

public override void Encode(ByteBuffer buffer)
{
throw new NotImplementedException();
}

public override void Decode(ByteBuffer buffer)
{
Expand Down
11 changes: 2 additions & 9 deletions src/ArtemisNetCoreClient/Framing/NullResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,6 @@ namespace ActiveMQ.Artemis.Core.Client.Framing;

internal class NullResponse : Packet
{
public const byte Type = 68;

public override void Encode(ByteBuffer buffer)
{
}

public override void Decode(ByteBuffer buffer)
{
}
public const byte Type = 21;
public override bool IsResponse => true;
}
15 changes: 12 additions & 3 deletions src/ArtemisNetCoreClient/Framing/Packet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,16 @@ namespace ActiveMQ.Artemis.Core.Client.Framing;

internal abstract class Packet
{
public abstract void Encode(ByteBuffer buffer);

public abstract void Decode(ByteBuffer buffer);
public virtual long CorrelationId => -1;
public virtual bool IsResponse => false;

public virtual void Encode(ByteBuffer buffer)
{

}

public virtual void Decode(ByteBuffer buffer)
{

}
}
6 changes: 6 additions & 0 deletions src/ArtemisNetCoreClient/Framing/SessionCloseMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace ActiveMQ.Artemis.Core.Client.Framing;

internal class SessionCloseMessage : Packet
{
public const byte Type = 69;
}
8 changes: 0 additions & 8 deletions src/ArtemisNetCoreClient/Framing/SessionStop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,4 @@ namespace ActiveMQ.Artemis.Core.Client.Framing;
internal class SessionStop : Packet
{
public const byte Type = 68;

public override void Encode(ByteBuffer buffer)
{
}

public override void Decode(ByteBuffer buffer)
{
}
}
41 changes: 1 addition & 40 deletions src/ArtemisNetCoreClient/ISession.cs
Original file line number Diff line number Diff line change
@@ -1,42 +1,3 @@
using System.Collections.Concurrent;
using ActiveMQ.Artemis.Core.Client.Framing;

namespace ActiveMQ.Artemis.Core.Client;

public interface ISession : IAsyncDisposable;

internal class Session : ISession
{
private readonly Transport _transport;

private ConcurrentDictionary<long, TaskCompletionSource<Packet>> _completionSources = new();

public Session(Transport transport)
{
_transport = transport;

_ = Task.Run(async () =>
{
while (true)
{
var packet = await _transport.ReceiveAsync(default);

}
});
}

public async ValueTask DisposeAsync()
{

await _transport.SendAsync(new SessionStop(), ChannelId, default);

await _transport.DisposeAsync().ConfigureAwait(false);
}

public long ChannelId { get; init; }

public async Task StartAsync(CancellationToken cancellationToken)
{
await _transport.SendAsync(new SessionStart(), ChannelId, cancellationToken);
}
}
public interface ISession : IAsyncDisposable;
75 changes: 75 additions & 0 deletions src/ArtemisNetCoreClient/Session.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
using System.Collections.Concurrent;
using ActiveMQ.Artemis.Core.Client.Framing;

namespace ActiveMQ.Artemis.Core.Client;

internal class Session : ISession
{
private readonly Transport _transport;

private readonly ConcurrentDictionary<long, TaskCompletionSource<Packet>> _completionSources = new();

public Session(Transport transport)
{
_transport = transport;

// TODO: Clean up while loop on close
_ = Task.Run(async () =>
{
while (true)
{
try
{
var packet = await _transport.ReceiveAsync(default);
if (packet.IsResponse && _completionSources.TryRemove(packet.CorrelationId, out var tcs))
{
tcs.TrySetResult(packet);
}
else
{
// TODO: Handle
}
}
catch (Exception e)
{
// TODO: Handle exception
Console.WriteLine(e);
}
}
});
}

public async ValueTask DisposeAsync()
{
_ = await SendBlockingAsync<SessionStop, NullResponse>(new SessionStop(), default);
_ = await SendBlockingAsync<SessionCloseMessage, NullResponse>(new SessionCloseMessage(), default);
await _transport.DisposeAsync().ConfigureAwait(false);
}

private async Task<TResponse> SendBlockingAsync<TRequest, TResponse>(TRequest request, CancellationToken cancellationToken) where TRequest : Packet
{
var tcs = new TaskCompletionSource<Packet>();

// TODO: Handle scenario when we cannot CorrelationId
_ = _completionSources.TryAdd(request.CorrelationId, tcs);

await _transport.SendAsync(request, ChannelId, cancellationToken);
var responsePacket = await tcs.Task;
if (responsePacket is TResponse response)
{
return response;
}
else
{
// TODO: Handle gracefully
throw new ArgumentException($"Expected response {typeof(TResponse).Name} but got {responsePacket.GetType().Name}");
}
}

public long ChannelId { get; init; }

public async Task StartAsync(CancellationToken cancellationToken)
{
await _transport.SendAsync(new SessionStart(), ChannelId, cancellationToken);
}
}
1 change: 1 addition & 0 deletions test/ArtemisNetCoreClient.Tests/SessionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ public async Task should_establish_session()

// Assert
Assert.IsNotNull(session);
await session.DisposeAsync();
}
}

0 comments on commit 7b273b3

Please sign in to comment.