Skip to content

Commit

Permalink
Session start
Browse files Browse the repository at this point in the history
  • Loading branch information
Havret committed Mar 14, 2024
1 parent 43e7827 commit a2a7416
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 14 deletions.
3 changes: 3 additions & 0 deletions src/ArtemisNetCoreClient/Framing/Codec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ public static void Encode(ByteBuffer buffer, Packet packet, long channelId)
var type = packet switch
{
CreateSessionMessageV2 => CreateSessionMessageV2.Type,
SessionStart => SessionStart.Type,
SessionStop => SessionStop.Type,
_ => throw new ArgumentOutOfRangeException(nameof(packet), packet, $"{packet.GetType()} is not supported for encoding")
};
buffer.WriteByte(type);
Expand All @@ -27,6 +29,7 @@ public static (Packet packet, long channelId) Decode(ByteBuffer buffer)
Packet packet = type switch
{
CreateSessionResponseMessage.Type => new CreateSessionResponseMessage(),
NullResponse.Type => new NullResponse(),
_ => throw new ArgumentOutOfRangeException($"Type {type} is not supported for decoding")
};

Expand Down
14 changes: 14 additions & 0 deletions src/ArtemisNetCoreClient/Framing/NullResponse.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace ActiveMQ.Artemis.Core.Client.Framing;

internal class NullResponse : Packet
{
public const byte Type = 68;

public override void Encode(ByteBuffer buffer)
{
}

public override void Decode(ByteBuffer buffer)
{
}
}
14 changes: 14 additions & 0 deletions src/ArtemisNetCoreClient/Framing/SessionStart.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace ActiveMQ.Artemis.Core.Client.Framing;

internal class SessionStart : Packet
{
public const byte Type = 67;

public override void Encode(ByteBuffer buffer)
{
}

public override void Decode(ByteBuffer buffer)
{
}
}
14 changes: 14 additions & 0 deletions src/ArtemisNetCoreClient/Framing/SessionStop.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace ActiveMQ.Artemis.Core.Client.Framing;

internal class SessionStop : Packet
{
public const byte Type = 68;

public override void Encode(ByteBuffer buffer)
{
}

public override void Decode(ByteBuffer buffer)
{
}
}
35 changes: 33 additions & 2 deletions src/ArtemisNetCoreClient/ISession.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,42 @@
using System.Collections.Concurrent;
using ActiveMQ.Artemis.Core.Client.Framing;

namespace ActiveMQ.Artemis.Core.Client;

public interface ISession : IAsyncDisposable;

internal class Session(Transport socket) : ISession
internal class Session : ISession
{
private readonly Transport _transport;

private ConcurrentDictionary<long, TaskCompletionSource<Packet>> _completionSources = new();

public Session(Transport transport)
{
_transport = transport;

_ = Task.Run(async () =>
{
while (true)
{
var packet = await _transport.ReceiveAsync(default);
}
});
}

public async ValueTask DisposeAsync()
{
await socket.DisposeAsync().ConfigureAwait(false);

await _transport.SendAsync(new SessionStop(), ChannelId, default);

await _transport.DisposeAsync().ConfigureAwait(false);
}

public long ChannelId { get; init; }

public async Task StartAsync(CancellationToken cancellationToken)
{
await _transport.SendAsync(new SessionStart(), ChannelId, cancellationToken);
}
}
15 changes: 8 additions & 7 deletions src/ArtemisNetCoreClient/SessionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public async Task<ISession> CreateAsync(Endpoint endpoint, CancellationToken can
var createSessionMessageV2 = new CreateSessionMessageV2
{
Name = Guid.NewGuid().ToString(),
SessionChannelId = 1,
SessionChannelId = 10,
Version = 135,
Username = endpoint.User,
Password = endpoint.Password,
Expand All @@ -59,20 +59,21 @@ public async Task<ISession> CreateAsync(Endpoint endpoint, CancellationToken can
DefaultAddress = null,
ClientId = null,
};

// var byteBuffer = new ByteBuffer();
// Codec.Encode(byteBuffer, createSessionMessageV2, 1);
// _ = await socket.SendAsync(byteBuffer.GetBuffer(), cancellationToken);

var transport = new Transport(socket);

await transport.SendAsync(createSessionMessageV2, cancellationToken);
await transport.SendAsync(createSessionMessageV2, 1, cancellationToken);

var receivedPacket = await transport.ReceiveAsync(cancellationToken);

if (receivedPacket is CreateSessionResponseMessage)
{
return new Session(transport);
var session = new Session(transport)
{
ChannelId = createSessionMessageV2.SessionChannelId
};
await session.StartAsync(cancellationToken);
return session;
}
else
{
Expand Down
8 changes: 3 additions & 5 deletions src/ArtemisNetCoreClient/Transport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,17 @@ namespace ActiveMQ.Artemis.Core.Client;

internal class Transport(Socket socket) : IAsyncDisposable
{
public async Task SendAsync(Packet packet, CancellationToken cancellationToken)
public async Task SendAsync(Packet packet, long channelId, CancellationToken cancellationToken)
{
var byteBuffer = new ByteBuffer();
Codec.Encode(byteBuffer, packet, 1);
Codec.Encode(byteBuffer, packet, channelId);
await socket.SendAsync(byteBuffer.GetBuffer(), cancellationToken).ConfigureAwait(false);
}

public async Task<Packet> ReceiveAsync(CancellationToken cancellationToken)
{
var receiveBuffer = new byte[sizeof(int)];
while (0 == await socket.ReceiveAsync(receiveBuffer, cancellationToken).ConfigureAwait(false))
{
}
await socket.ReceiveAsync(receiveBuffer, cancellationToken).ConfigureAwait(false);

var size = new ByteBuffer(receiveBuffer).ReadInt();

Expand Down

0 comments on commit a2a7416

Please sign in to comment.