Skip to content

Commit

Permalink
Create ActiveMQ Artemis Session
Browse files Browse the repository at this point in the history
  • Loading branch information
Havret committed Mar 13, 2024
1 parent 5249557 commit 43e7827
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 8 deletions.
9 changes: 3 additions & 6 deletions src/ArtemisNetCoreClient/ISession.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
using System.Net.Sockets;

namespace ActiveMQ.Artemis.Core.Client;

public interface ISession : IAsyncDisposable;

internal class Session(Socket socket) : ISession
internal class Session(Transport socket) : ISession
{
public ValueTask DisposeAsync()
public async ValueTask DisposeAsync()
{
socket.Dispose();
return ValueTask.CompletedTask;
await socket.DisposeAsync().ConfigureAwait(false);
}
}
39 changes: 37 additions & 2 deletions src/ArtemisNetCoreClient/SessionFactory.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Net;
using System.Net.Sockets;
using ActiveMQ.Artemis.Core.Client.Framing;

namespace ActiveMQ.Artemis.Core.Client;

Expand Down Expand Up @@ -41,7 +42,41 @@ public async Task<ISession> CreateAsync(Endpoint endpoint, CancellationToken can
{
throw exception ?? new SocketException((int)SocketError.AddressNotAvailable);
}

var createSessionMessageV2 = new CreateSessionMessageV2
{
Name = Guid.NewGuid().ToString(),
SessionChannelId = 1,
Version = 135,
Username = endpoint.User,
Password = endpoint.Password,
MinLargeMessageSize = 100 * 1024,
Xa = false,
AutoCommitSends = true,
AutoCommitAcks = true,
PreAcknowledge = false,
WindowSize = -1,
DefaultAddress = null,
ClientId = null,
};

// var byteBuffer = new ByteBuffer();
// Codec.Encode(byteBuffer, createSessionMessageV2, 1);
// _ = await socket.SendAsync(byteBuffer.GetBuffer(), cancellationToken);

var transport = new Transport(socket);

await transport.SendAsync(createSessionMessageV2, cancellationToken);

return new Session(socket);
var receivedPacket = await transport.ReceiveAsync(cancellationToken);

if (receivedPacket is CreateSessionResponseMessage)
{
return new Session(transport);
}
else
{
throw new InvalidOperationException("Received invalid response from the broker");
}
}
}
}
38 changes: 38 additions & 0 deletions src/ArtemisNetCoreClient/Transport.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using System.Net.Sockets;
using ActiveMQ.Artemis.Core.Client.Framing;

namespace ActiveMQ.Artemis.Core.Client;

internal class Transport(Socket socket) : IAsyncDisposable
{
public async Task SendAsync(Packet packet, CancellationToken cancellationToken)
{
var byteBuffer = new ByteBuffer();
Codec.Encode(byteBuffer, packet, 1);
await socket.SendAsync(byteBuffer.GetBuffer(), cancellationToken).ConfigureAwait(false);
}

public async Task<Packet> ReceiveAsync(CancellationToken cancellationToken)
{
var receiveBuffer = new byte[sizeof(int)];
while (0 == await socket.ReceiveAsync(receiveBuffer, cancellationToken).ConfigureAwait(false))
{
}

var size = new ByteBuffer(receiveBuffer).ReadInt();

var buffer = new byte[size];
_ = await socket.ReceiveAsync(buffer, cancellationToken).ConfigureAwait(false);

var payloadBuffer = new ByteBuffer(buffer);

var (packet, _) = Codec.Decode(payloadBuffer);
return packet;
}

public ValueTask DisposeAsync()
{
socket.Dispose();
return ValueTask.CompletedTask;
}
}
23 changes: 23 additions & 0 deletions test/ArtemisNetCoreClient.Tests/SessionTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace ActiveMQ.Artemis.Core.Client.Tests;

public class SessionTests
{
[Test]
public async Task should_establish_session()
{
// Arrange
var connectionFactory = new SessionFactory();

// Act
var session = await connectionFactory.CreateAsync(new Endpoint
{
Host = "localhost",
Port = 5445,
User = "artemis",
Password = "artemis"
});

// Assert
Assert.IsNotNull(session);
}
}

0 comments on commit 43e7827

Please sign in to comment.