Skip to content

Commit

Permalink
feat: add unit test for RtmpChunkMessageAggregatorService
Browse files Browse the repository at this point in the history
  • Loading branch information
josephnhtam committed Oct 4, 2024
1 parent 72f140a commit e60962f
Show file tree
Hide file tree
Showing 2 changed files with 261 additions and 190 deletions.
Original file line number Diff line number Diff line change
@@ -1,223 +1,92 @@
using AutoFixture;
using FluentAssertions;
using LiveStreamingServerNet.Rtmp.Internal;
using LiveStreamingServerNet.Networking.Contracts;
using LiveStreamingServerNet.Rtmp.Internal.Contracts;
using LiveStreamingServerNet.Rtmp.Internal.RtmpEventHandlers.Dispatcher.Contracts;
using LiveStreamingServerNet.Rtmp.Internal.RtmpHeaders;
using LiveStreamingServerNet.Rtmp.Internal.Services;
using LiveStreamingServerNet.Rtmp.Internal.Services.Contracts;
using LiveStreamingServerNet.Rtmp.Server.Internal.Contracts;
using LiveStreamingServerNet.Rtmp.Server.Internal.RtmpEventHandlers;
using LiveStreamingServerNet.Rtmp.Server.Internal.RtmpEvents;
using LiveStreamingServerNet.Rtmp.Server.Internal.Services.Contracts;
using LiveStreamingServerNet.Rtmp.Test.Utilities;
using LiveStreamingServerNet.Utilities.Buffers;
using LiveStreamingServerNet.Utilities.Buffers.Configurations;
using LiveStreamingServerNet.Utilities.Buffers.Contracts;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using NSubstitute;

namespace LiveStreamingServerNet.Rtmp.Server.Test.RtmpEventHandlers
{
public class RtmpChunkEventHandlerTest
{
[Theory]
[MemberData(nameof(CreateTestParameters))]
internal async Task Handle_Should_HandleChunksCorrectly<TRtmpChunkMessageHeader>(
IRtmpClientSessionContext clientContext,
IRtmpChunkStreamContext streamContext,
Stream stream,
byte[] payload,
RtmpChunkBasicHeader basicHeader,
TRtmpChunkMessageHeader messageHeader)
where TRtmpChunkMessageHeader : struct, IRtmpChunkMessageHeader
private readonly IRtmpMessageDispatcher<IRtmpClientSessionContext> _dispatcher;
private readonly IRtmpProtocolControlService _protocolControl;
private readonly IRtmpChunkMessageAggregatorService _chunkMessageAggregator;
private readonly ILogger<RtmpChunkEventHandler> _logger;
private readonly RtmpChunkEvent _event;
private readonly Fixture _fixture;
private readonly INetworkStreamReader _networkStreamReader;
private readonly IRtmpClientSessionContext _clientSessionContext;
private readonly IRtmpChunkStreamContext _chunkStreamContext;
private readonly RtmpChunkEventHandler _sut;

public RtmpChunkEventHandlerTest()
{
// Arrange
var dataBufferPool = new DataBufferPool(Options.Create(new DataBufferPoolConfiguration()));
var dispatcher = Substitute.For<IRtmpMessageDispatcher<IRtmpClientSessionContext>>();
var protocolControl = Substitute.For<IRtmpProtocolControlService>();
var logger = Substitute.For<ILogger<RtmpChunkEventHandler>>();

using IDataBuffer resultPayloadBuffer = new DataBuffer();

var tcs = new TaskCompletionSource();
dispatcher.DispatchAsync(streamContext, clientContext, Arg.Any<CancellationToken>())
.Returns(true)
.AndDoes(x =>
{
streamContext.PayloadBuffer!.ReadAndWriteTo(resultPayloadBuffer, streamContext.PayloadBuffer.Size);
resultPayloadBuffer.MoveTo(0);
tcs.SetResult();
});

var networkStream = new NetworkStream(stream);
var chunkMessageAggregator = new RtmpChunkMessageAggregatorService(dataBufferPool);
var sut = new RtmpChunkEventHandler(dispatcher, protocolControl, chunkMessageAggregator, logger);
_fixture = new Fixture();
_networkStreamReader = Substitute.For<INetworkStreamReader>();
_clientSessionContext = Substitute.For<IRtmpClientSessionContext>();
_chunkStreamContext = Substitute.For<IRtmpChunkStreamContext>();
_dispatcher = Substitute.For<IRtmpMessageDispatcher<IRtmpClientSessionContext>>();
_protocolControl = Substitute.For<IRtmpProtocolControlService>();
_chunkMessageAggregator = Substitute.For<IRtmpChunkMessageAggregatorService>();
_logger = Substitute.For<ILogger<RtmpChunkEventHandler>>();

_event = new RtmpChunkEvent { ClientContext = _clientSessionContext, NetworkStream = _networkStreamReader };
_clientSessionContext.GetChunkStreamContext(Arg.Any<uint>()).Returns(_chunkStreamContext);

_sut = new RtmpChunkEventHandler(_dispatcher, _protocolControl, _chunkMessageAggregator, _logger);
}

while (!tcs.Task.IsCompleted)
{
// Act
var @event = new RtmpChunkEvent { ClientContext = clientContext, NetworkStream = networkStream };
var result = await sut.Handle(@event, default);
[Fact]
internal async Task Handle_Should_DispatchRtmpMessage()
{
// Arrange
_chunkMessageAggregator.AggregateChunkMessagesAsync(_networkStreamReader, _clientSessionContext, Arg.Any<CancellationToken>())
.Returns(new RtmpChunkMessageAggregationResult(true, _fixture.Create<int>(), _chunkStreamContext));

// Assert
result.Succeeded.Should().Be(true);
}
// Act
await _sut.Handle(_event, default);

// Assert
if (messageHeader is RtmpChunkMessageHeaderType0 headerType0)
{
streamContext.ChunkStreamId.Should().Be(basicHeader.ChunkStreamId);
streamContext.MessageHeader.Timestamp.Should().Be(headerType0.Timestamp);
streamContext.MessageHeader.MessageLength.Should().Be(headerType0.MessageLength);
streamContext.MessageHeader.MessageTypeId.Should().Be(headerType0.MessageTypeId);
streamContext.MessageHeader.MessageStreamId.Should().Be(headerType0.MessageStreamId);
streamContext.MessageHeader.HasExtendedTimestamp.Should().Be(headerType0.HasExtendedTimestamp());
}
else if (messageHeader is RtmpChunkMessageHeaderType1 headerType1)
{
streamContext.ChunkStreamId.Should().Be(basicHeader.ChunkStreamId);
streamContext.MessageHeader.Timestamp.Should().Be(headerType1.TimestampDelta);
streamContext.MessageHeader.MessageLength.Should().Be(headerType1.MessageLength);
streamContext.MessageHeader.MessageTypeId.Should().Be(headerType1.MessageTypeId);
streamContext.MessageHeader.HasExtendedTimestamp.Should().Be(headerType1.HasExtendedTimestamp());
}
else if (messageHeader is RtmpChunkMessageHeaderType2 headerType2)
{
streamContext.ChunkStreamId.Should().Be(basicHeader.ChunkStreamId);
streamContext.MessageHeader.Timestamp.Should().Be(headerType2.TimestampDelta);
streamContext.MessageHeader.HasExtendedTimestamp.Should().Be(headerType2.HasExtendedTimestamp());
}

_ = dispatcher.Received(1).DispatchAsync(streamContext, clientContext, Arg.Any<CancellationToken>());

resultPayloadBuffer.UnderlyingBuffer.Take(resultPayloadBuffer.Size)
.Should().BeEquivalentTo(payload);

streamContext.PayloadBuffer.Should().BeNull();
_ = _dispatcher.Received(1).DispatchAsync(_chunkStreamContext, _clientSessionContext, Arg.Any<CancellationToken>());
}

public static IEnumerable<object[]> CreateTestParameters()
[Fact]
internal async Task Handle_Should_ResetChunkStreamContext()
{
var fixture = new Fixture();

{
var chunkSize = 128u;
var chunkStreamId = Helpers.CreateRandomChunkStreamId();

var streamContext = new RtmpChunkStreamContext(chunkStreamId);
var clientContext = Substitute.For<IRtmpClientSessionContext>();
clientContext.InChunkSize.Returns(chunkSize);
(clientContext as IRtmpChunkStreamContextProvider).InChunkSize.Returns(chunkSize);
clientContext.GetChunkStreamContext(chunkStreamId).Returns(streamContext);

var payload = fixture.CreateMany<byte>(1000).ToArray();
var basicHeader = new RtmpChunkBasicHeader(0, chunkStreamId);
var messageHeader = new RtmpChunkMessageHeaderType0(
fixture.Create<uint>(), payload.Length, fixture.Create<byte>(), fixture.Create<uint>());

yield return new object[]
{
clientContext,
streamContext,
CreateStream(basicHeader, messageHeader, payload, chunkSize),
payload,
basicHeader,
messageHeader
};
}

{
var chunkSize = 2000u;
var chunkStreamId = Helpers.CreateRandomChunkStreamId();

var streamContext = new RtmpChunkStreamContext(chunkStreamId);
var clientContext = Substitute.For<IRtmpClientSessionContext>();
clientContext.InChunkSize.Returns(chunkSize);
(clientContext as IRtmpChunkStreamContextProvider).InChunkSize.Returns(chunkSize);
clientContext.GetChunkStreamContext(chunkStreamId).Returns(streamContext);

var payload = fixture.CreateMany<byte>(1000).ToArray();
var basicHeader = new RtmpChunkBasicHeader(0, chunkStreamId);
var messageHeader = new RtmpChunkMessageHeaderType0(
fixture.Create<uint>(), payload.Length, fixture.Create<byte>(), fixture.Create<uint>());

yield return new object[]
{
clientContext,
streamContext,
CreateStream(basicHeader, messageHeader, payload, chunkSize),
payload,
basicHeader,
messageHeader
};
}

{
var chunkSize = 500u;
var chunkStreamId = Helpers.CreateRandomChunkStreamId();

var streamContext = new RtmpChunkStreamContext(chunkStreamId);
var clientContext = Substitute.For<IRtmpClientSessionContext>();
clientContext.InChunkSize.Returns(chunkSize);
(clientContext as IRtmpChunkStreamContextProvider).InChunkSize.Returns(chunkSize);
clientContext.GetChunkStreamContext(chunkStreamId).Returns(streamContext);

var payload = fixture.CreateMany<byte>(1000).ToArray();
var basicHeader = new RtmpChunkBasicHeader(1, chunkStreamId);
var messageHeader = new RtmpChunkMessageHeaderType1(
fixture.Create<uint>(), payload.Length, fixture.Create<byte>());

yield return new object[]
{
clientContext,
streamContext,
CreateStream(basicHeader, messageHeader, payload, chunkSize),
payload,
basicHeader,
messageHeader
};
}

{
var chunkSize = 500u;
var chunkStreamId = Helpers.CreateRandomChunkStreamId();

var streamContext = new RtmpChunkStreamContext(chunkStreamId);
var clientContext = Substitute.For<IRtmpClientSessionContext>();
clientContext.InChunkSize.Returns(chunkSize);
(clientContext as IRtmpChunkStreamContextProvider).InChunkSize.Returns(chunkSize);
clientContext.GetChunkStreamContext(chunkStreamId).Returns(streamContext);
// Arrange
_chunkMessageAggregator.AggregateChunkMessagesAsync(_networkStreamReader, _clientSessionContext, Arg.Any<CancellationToken>())
.Returns(new RtmpChunkMessageAggregationResult(_fixture.Create<bool>(), _fixture.Create<int>(), _chunkStreamContext));

var payload = new byte[0];
var basicHeader = new RtmpChunkBasicHeader(2, chunkStreamId);
var messageHeader = new RtmpChunkMessageHeaderType2(fixture.Create<uint>());
// Act
await _sut.Handle(_event, default);

yield return new object[]
{
clientContext,
streamContext,
CreateStream(basicHeader, messageHeader, payload, chunkSize),
payload,
basicHeader,
messageHeader
};
}
// Assert
_chunkMessageAggregator.Received(1).ResetChunkStreamContext(_chunkStreamContext);
}

private static Stream CreateStream<TRtmpChunkMessageHeader>
(RtmpChunkBasicHeader basicHeader, TRtmpChunkMessageHeader messageHeader, byte[] payload, uint chunkSize)
where TRtmpChunkMessageHeader : struct, IRtmpChunkMessageHeader
[Theory]
[InlineData(true)]
[InlineData(false)]
internal async Task Handle_Should_ReturnSucceeded_When_RtmpMessageHandlingIsSuccessful(bool success)
{
var writer = new RtmpChunkMessageWriterService();
_chunkMessageAggregator.AggregateChunkMessagesAsync(_networkStreamReader, _clientSessionContext, Arg.Any<CancellationToken>())
.Returns(new RtmpChunkMessageAggregationResult(true, _fixture.Create<int>(), _chunkStreamContext));

_dispatcher.DispatchAsync(_chunkStreamContext, _clientSessionContext, Arg.Any<CancellationToken>())
.Returns(success);

using var payloadBuffer = new DataBuffer();
payloadBuffer.Write(payload);
payloadBuffer.MoveTo(0);
// Act
var result = await _sut.Handle(_event, default);

using var tempBuffer = new DataBuffer();
writer.Write(tempBuffer, basicHeader, messageHeader, payloadBuffer, chunkSize);
return new MemoryStream(tempBuffer.UnderlyingBuffer.Take(tempBuffer.Size).ToArray());
// Assert
result.Succeeded.Should().Be(success);
}
}
}
Loading

0 comments on commit e60962f

Please sign in to comment.