Skip to content

Commit

Permalink
Create Address WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Havret committed Mar 20, 2024
1 parent bc7a90b commit bfb0ab4
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 20 deletions.
2 changes: 2 additions & 0 deletions ArtemisNetCoreClient.sln.DotSettings
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:Boolean x:Key="/Default/UserDictionary/Words/=Anycast/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
1 change: 1 addition & 0 deletions src/ArtemisNetCoreClient/Framing/Codec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public static void Encode(ByteBuffer buffer, Packet packet, long channelId)
SessionStart => SessionStart.Type,
SessionStop => SessionStop.Type,
SessionCloseMessage => SessionCloseMessage.Type,
CreateAddressMessage => CreateAddressMessage.Type,
_ => throw new ArgumentOutOfRangeException(nameof(packet), packet, $"{packet.GetType()} is not supported for encoding")
};
buffer.WriteByte(type);
Expand Down
22 changes: 22 additions & 0 deletions src/ArtemisNetCoreClient/Framing/CreateAddressMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
namespace ActiveMQ.Artemis.Core.Client.Framing;

internal class CreateAddressMessage : Packet
{
public const byte Type = unchecked((byte) -11);
public required string Address { get; init; }
public required RoutingType[] RoutingTypes { get; init; }
public required bool RequiresResponse { get; init; }
public required bool AutoCreated { get; init; }

public override void Encode(ByteBuffer buffer)
{
buffer.WriteString(Address);
buffer.WriteInt(RoutingTypes.Length);
foreach (var routingType in RoutingTypes)
{
buffer.WriteByte((byte) routingType);
}
buffer.WriteBool(RequiresResponse);
buffer.WriteBool(AutoCreated);
}
}
5 changes: 0 additions & 5 deletions src/ArtemisNetCoreClient/Framing/CreateSessionMessageV2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,4 @@ public override void Encode(ByteBuffer buffer)
buffer.WriteNullableString(DefaultAddress);
buffer.WriteNullableString(ClientId);
}

public override void Decode(ByteBuffer buffer)
{
throw new NotImplementedException();
}
}
7 changes: 7 additions & 0 deletions src/ArtemisNetCoreClient/Framing/RoutingType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace ActiveMQ.Artemis.Core.Client.Framing;

public enum RoutingType : byte
{
Multicast = 0,
Anycast = 1
}
8 changes: 0 additions & 8 deletions src/ArtemisNetCoreClient/Framing/SessionStart.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,4 @@ namespace ActiveMQ.Artemis.Core.Client.Framing;
internal class SessionStart : Packet
{
public const byte Type = 67;

public override void Encode(ByteBuffer buffer)
{
}

public override void Decode(ByteBuffer buffer)
{
}
}
7 changes: 6 additions & 1 deletion src/ArtemisNetCoreClient/ISession.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
using ActiveMQ.Artemis.Core.Client.Framing;

namespace ActiveMQ.Artemis.Core.Client;

public interface ISession : IAsyncDisposable;
public interface ISession : IAsyncDisposable
{
public Task CreateAddress(string address, IEnumerable<RoutingType> routingTypes, bool autoCreated, CancellationToken cancellationToken);
}
22 changes: 17 additions & 5 deletions src/ArtemisNetCoreClient/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public Session(Transport transport)
try
{
var packet = await _transport.ReceiveAsync(default);
if (packet.IsResponse && _completionSources.TryRemove(packet.CorrelationId, out var tcs))
if (packet is { IsResponse: true } && _completionSources.TryRemove(packet.CorrelationId, out var tcs))
{
tcs.TrySetResult(packet);
}
Expand All @@ -39,21 +39,33 @@ public Session(Transport transport)
});
}

public async Task CreateAddress(string address, IEnumerable<RoutingType> routingTypes, bool autoCreated, CancellationToken cancellationToken)
{
var createAddressMessage = new CreateAddressMessage
{
Address = address,
RoutingTypes = routingTypes.ToArray(),
AutoCreated = autoCreated,
RequiresResponse = true
};
_ = await SendBlockingAsync<CreateAddressMessage, NullResponse>(createAddressMessage, 11, cancellationToken);
}

public async ValueTask DisposeAsync()
{
_ = await SendBlockingAsync<SessionStop, NullResponse>(new SessionStop(), default);
_ = await SendBlockingAsync<SessionCloseMessage, NullResponse>(new SessionCloseMessage(), default);
_ = await SendBlockingAsync<SessionStop, NullResponse>(new SessionStop(), ChannelId, default);
_ = await SendBlockingAsync<SessionCloseMessage, NullResponse>(new SessionCloseMessage(),ChannelId, default);
await _transport.DisposeAsync().ConfigureAwait(false);
}

private async Task<TResponse> SendBlockingAsync<TRequest, TResponse>(TRequest request, CancellationToken cancellationToken) where TRequest : Packet
private async Task<TResponse> SendBlockingAsync<TRequest, TResponse>(TRequest request, long channelId, CancellationToken cancellationToken) where TRequest : Packet
{
var tcs = new TaskCompletionSource<Packet>();

// TODO: Handle scenario when we cannot CorrelationId
_ = _completionSources.TryAdd(request.CorrelationId, tcs);

await _transport.SendAsync(request, ChannelId, cancellationToken);
await _transport.SendAsync(request, channelId, cancellationToken);
var responsePacket = await tcs.Task;
if (responsePacket is TResponse response)
{
Expand Down
6 changes: 5 additions & 1 deletion src/ArtemisNetCoreClient/Transport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ public async Task SendAsync(Packet packet, long channelId, CancellationToken can
await socket.SendAsync(byteBuffer.GetBuffer(), cancellationToken).ConfigureAwait(false);
}

public async Task<Packet> ReceiveAsync(CancellationToken cancellationToken)
public async Task<Packet?> ReceiveAsync(CancellationToken cancellationToken)
{
var receiveBuffer = new byte[sizeof(int)];
await socket.ReceiveAsync(receiveBuffer, cancellationToken).ConfigureAwait(false);

var size = new ByteBuffer(receiveBuffer).ReadInt();
if (size == 0)
{
return null;
}

var buffer = new byte[size];
_ = await socket.ReceiveAsync(buffer, cancellationToken).ConfigureAwait(false);
Expand Down
19 changes: 19 additions & 0 deletions test/ArtemisNetCoreClient.Tests/SessionTests.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using ActiveMQ.Artemis.Core.Client.Framing;

namespace ActiveMQ.Artemis.Core.Client.Tests;

public class SessionTests
Expand All @@ -21,4 +23,21 @@ public async Task should_establish_session()
Assert.IsNotNull(session);
await session.DisposeAsync();
}

[Test, Ignore("WIP")]
public async Task should_create_address()
{
// Arrange
var connectionFactory = new SessionFactory();
var session = await connectionFactory.CreateAsync(new Endpoint
{
Host = "localhost",
Port = 5445,
User = "artemis",
Password = "artemis"
});

// Act && Assert
await session.CreateAddress("myaddress", Enumerable.Empty<RoutingType>(), false, default);
}
}

0 comments on commit bfb0ab4

Please sign in to comment.