Skip to content

Commit

Permalink
refactor: add IRtmpAcknowledgementHandlerService
Browse files Browse the repository at this point in the history
  • Loading branch information
josephnhtam committed Oct 4, 2024
1 parent e60962f commit d49361e
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ private static IServiceCollection AddRtmpServices(this IServiceCollection servic
.AddSingleton<IRtmpProtocolControlService, RtmpProtocolControlService>()
.AddSingleton<IRtmpCommandMessageSenderService, RtmpCommandMessageSenderService>()
.AddSingleton<IRtmpCommanderService, RtmpCommanderService>()
.AddSingleton<IRtmpMediaDataSenderService, RtmpMediaDataSenderService>();
.AddSingleton<IRtmpMediaDataSenderService, RtmpMediaDataSenderService>()
.AddSingleton<IRtmpAcknowledgementHandlerService, RtmpAcknowledgementHandlerService>();

services.AddSingleton<RtmpCommandResultManagerService>()
.AddSingleton<IRtmpCommandResultManagerService>(sp => sp.GetRequiredService<RtmpCommandResultManagerService>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@ namespace LiveStreamingServerNet.Rtmp.Client.Internal.RtmpEventHandlers
internal class RtmpChunkEventHandler : IRequestHandler<RtmpChunkEvent, RtmpEventConsumingResult>
{
private readonly IRtmpMessageDispatcher<IRtmpSessionContext> _dispatcher;
private readonly IRtmpProtocolControlService _protocolControl;
private readonly IRtmpChunkMessageAggregatorService _chunkMessageAggregator;
private readonly IRtmpAcknowledgementHandlerService _acknowledgementHandler;
private readonly ILogger _logger;

public RtmpChunkEventHandler(
IRtmpMessageDispatcher<IRtmpSessionContext> dispatcher,
IRtmpProtocolControlService protocolControl,
IRtmpChunkMessageAggregatorService chunkMessageAggregator,
IRtmpAcknowledgementHandlerService acknowledgementHandler,
ILogger<RtmpChunkEventHandler> logger)
{
_dispatcher = dispatcher;
_protocolControl = protocolControl;
_chunkMessageAggregator = chunkMessageAggregator;
_acknowledgementHandler = acknowledgementHandler;
_logger = logger;
}

Expand All @@ -41,31 +41,9 @@ public async ValueTask<RtmpEventConsumingResult> Handle(RtmpChunkEvent @event, C
return new RtmpEventConsumingResult(false, aggregationResult.ChunkMessageSize);
}

HandleAcknowledgement(@event, aggregationResult.ChunkMessageSize);
return new RtmpEventConsumingResult(true, aggregationResult.ChunkMessageSize);
}

private void HandleAcknowledgement(RtmpChunkEvent @event, int consumedBytes)
{
var context = @event.Context;

if (context.InWindowAcknowledgementSize == 0)
return;
_acknowledgementHandler.Handle(@event.Context, aggregationResult.ChunkMessageSize);

context.SequenceNumber += (uint)consumedBytes;
if (context.SequenceNumber - context.LastAcknowledgedSequenceNumber >= context.InWindowAcknowledgementSize)
{
_protocolControl.Acknowledgement(context.SequenceNumber);

const uint overflow = 0xf0000000;
if (context.SequenceNumber >= overflow)
{
context.SequenceNumber -= overflow;
context.LastAcknowledgedSequenceNumber -= overflow;
}

context.LastAcknowledgedSequenceNumber = context.SequenceNumber;
}
return new RtmpEventConsumingResult(true, aggregationResult.ChunkMessageSize);
}

private async Task<bool> HandleRtmpMessageAsync(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using LiveStreamingServerNet.Rtmp.Client.Internal.Contracts;

namespace LiveStreamingServerNet.Rtmp.Client.Internal.Services.Contracts
{
internal interface IRtmpAcknowledgementHandlerService
{
void Handle(IRtmpSessionContext context, int receivedBytes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using LiveStreamingServerNet.Rtmp.Client.Internal.Contracts;
using LiveStreamingServerNet.Rtmp.Client.Internal.Services.Contracts;

namespace LiveStreamingServerNet.Rtmp.Client.Internal.Services
{
internal class RtmpAcknowledgementHandlerService : IRtmpAcknowledgementHandlerService
{
private readonly IRtmpProtocolControlService _protocolControl;

public RtmpAcknowledgementHandlerService(IRtmpProtocolControlService protocolControl)
{
_protocolControl = protocolControl;
}

public void Handle(IRtmpSessionContext context, int receivedBytes)
{
if (context.InWindowAcknowledgementSize == 0)
return;

context.SequenceNumber += (uint)receivedBytes;
if (context.SequenceNumber - context.LastAcknowledgedSequenceNumber >= context.InWindowAcknowledgementSize)
{
_protocolControl.Acknowledgement(context.SequenceNumber);

const uint overflow = 0xf0000000;
if (context.SequenceNumber >= overflow)
{
context.SequenceNumber -= overflow;
context.LastAcknowledgedSequenceNumber -= overflow;
}

context.LastAcknowledgedSequenceNumber = context.SequenceNumber;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ private static IServiceCollection AddRtmpServices(this IServiceCollection servic
.AddSingleton<IRtmpMediaMessageBroadcasterService, RtmpMediaMessageBroadcasterService>()
.AddSingleton<IRtmpStreamDeletionService, RtmpStreamDeletionService>()
.AddSingleton<IRtmpMediaMessageInterceptionService, RtmpMediaMessageInterceptionService>()
.AddSingleton<IRtmpMediaCachingInterceptionService, RtmpMediaCachingInterceptionService>();
.AddSingleton<IRtmpMediaCachingInterceptionService, RtmpMediaCachingInterceptionService>()
.AddSingleton<IRtmpAcknowledgementHandlerService, RtmpAcknowledgementHandlerService>();

services.AddSingleton<IRtmpStreamInfoManager, RtmpStreamInfoManager>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@ namespace LiveStreamingServerNet.Rtmp.Server.Internal.RtmpEventHandlers
internal class RtmpChunkEventHandler : IRequestHandler<RtmpChunkEvent, RtmpEventConsumingResult>
{
private readonly IRtmpMessageDispatcher<IRtmpClientSessionContext> _dispatcher;
private readonly IRtmpProtocolControlService _protocolControl;
private readonly IRtmpChunkMessageAggregatorService _chunkMessageAggregator;
private readonly IRtmpAcknowledgementHandlerService _acknowledgementHandler;
private readonly ILogger _logger;

public RtmpChunkEventHandler(
IRtmpMessageDispatcher<IRtmpClientSessionContext> dispatcher,
IRtmpProtocolControlService protocolControl,
IRtmpChunkMessageAggregatorService chunkMessageAggregator,
IRtmpAcknowledgementHandlerService acknowledgementHandler,
ILogger<RtmpChunkEventHandler> logger)
{
_dispatcher = dispatcher;
_protocolControl = protocolControl;
_chunkMessageAggregator = chunkMessageAggregator;
_acknowledgementHandler = acknowledgementHandler;
_logger = logger;
}

Expand All @@ -42,31 +42,9 @@ public async ValueTask<RtmpEventConsumingResult> Handle(RtmpChunkEvent @event, C
return new RtmpEventConsumingResult(false, aggregationResult.ChunkMessageSize);
}

HandleAcknowledgement(@event, aggregationResult.ChunkMessageSize);
return new RtmpEventConsumingResult(true, aggregationResult.ChunkMessageSize);
}

private void HandleAcknowledgement(RtmpChunkEvent @event, int consumedBytes)
{
var clientContext = @event.ClientContext;

if (clientContext.InWindowAcknowledgementSize == 0)
return;
_acknowledgementHandler.Handle(@event.ClientContext, aggregationResult.ChunkMessageSize);

clientContext.SequenceNumber += (uint)consumedBytes;
if (clientContext.SequenceNumber - clientContext.LastAcknowledgedSequenceNumber >= clientContext.InWindowAcknowledgementSize)
{
_protocolControl.Acknowledgement(clientContext, clientContext.SequenceNumber);

const uint overflow = 0xf0000000;
if (clientContext.SequenceNumber >= overflow)
{
clientContext.SequenceNumber -= overflow;
clientContext.LastAcknowledgedSequenceNumber -= overflow;
}

clientContext.LastAcknowledgedSequenceNumber = clientContext.SequenceNumber;
}
return new RtmpEventConsumingResult(true, aggregationResult.ChunkMessageSize);
}

private async Task<bool> HandleRtmpMessageAsync(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using LiveStreamingServerNet.Rtmp.Server.Internal.Contracts;

namespace LiveStreamingServerNet.Rtmp.Server.Internal.Services.Contracts
{
internal interface IRtmpAcknowledgementHandlerService
{
void Handle(IRtmpClientSessionContext clientContext, int receivedBytes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using LiveStreamingServerNet.Rtmp.Server.Internal.Contracts;
using LiveStreamingServerNet.Rtmp.Server.Internal.Services.Contracts;

namespace LiveStreamingServerNet.Rtmp.Server.Internal.Services
{
internal class RtmpAcknowledgementHandlerService : IRtmpAcknowledgementHandlerService
{
private readonly IRtmpProtocolControlService _protocolControl;

public RtmpAcknowledgementHandlerService(IRtmpProtocolControlService protocolControl)
{
_protocolControl = protocolControl;
}

public void Handle(IRtmpClientSessionContext clientContext, int receivedBytes)
{
if (clientContext.InWindowAcknowledgementSize == 0)
return;

clientContext.SequenceNumber += (uint)receivedBytes;
if (clientContext.SequenceNumber - clientContext.LastAcknowledgedSequenceNumber >= clientContext.InWindowAcknowledgementSize)
{
_protocolControl.Acknowledgement(clientContext, clientContext.SequenceNumber);

const uint overflow = 0xf0000000;
if (clientContext.SequenceNumber >= overflow)
{
clientContext.SequenceNumber -= overflow;
clientContext.LastAcknowledgedSequenceNumber -= overflow;
}

clientContext.LastAcknowledgedSequenceNumber = clientContext.SequenceNumber;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace LiveStreamingServerNet.Rtmp.Server.Test.RtmpEventHandlers
public class RtmpChunkEventHandlerTest
{
private readonly IRtmpMessageDispatcher<IRtmpClientSessionContext> _dispatcher;
private readonly IRtmpProtocolControlService _protocolControl;
private readonly IRtmpAcknowledgementHandlerService _acknowledgementHandler;
private readonly IRtmpChunkMessageAggregatorService _chunkMessageAggregator;
private readonly ILogger<RtmpChunkEventHandler> _logger;
private readonly RtmpChunkEvent _event;
Expand All @@ -33,14 +33,15 @@ public RtmpChunkEventHandlerTest()
_clientSessionContext = Substitute.For<IRtmpClientSessionContext>();
_chunkStreamContext = Substitute.For<IRtmpChunkStreamContext>();
_dispatcher = Substitute.For<IRtmpMessageDispatcher<IRtmpClientSessionContext>>();
_protocolControl = Substitute.For<IRtmpProtocolControlService>();
_acknowledgementHandler = Substitute.For<IRtmpAcknowledgementHandlerService>();
_chunkMessageAggregator = Substitute.For<IRtmpChunkMessageAggregatorService>();
_logger = Substitute.For<ILogger<RtmpChunkEventHandler>>();

_event = new RtmpChunkEvent { ClientContext = _clientSessionContext, NetworkStream = _networkStreamReader };
_clientSessionContext.GetChunkStreamContext(Arg.Any<uint>()).Returns(_chunkStreamContext);
_dispatcher.DispatchAsync(_chunkStreamContext, _clientSessionContext, Arg.Any<CancellationToken>()).Returns(true);

_sut = new RtmpChunkEventHandler(_dispatcher, _protocolControl, _chunkMessageAggregator, _logger);
_sut = new RtmpChunkEventHandler(_dispatcher, _chunkMessageAggregator, _acknowledgementHandler, _logger);
}

[Fact]
Expand Down Expand Up @@ -71,6 +72,22 @@ internal async Task Handle_Should_ResetChunkStreamContext()
_chunkMessageAggregator.Received(1).ResetChunkStreamContext(_chunkStreamContext);
}

[Fact]
internal async Task Handle_Should_InvokeAcknowledgmentHandler()
{
// Arrange
var chunkMessageSize = _fixture.Create<int>();

_chunkMessageAggregator.AggregateChunkMessagesAsync(_networkStreamReader, _clientSessionContext, Arg.Any<CancellationToken>())
.Returns(new RtmpChunkMessageAggregationResult(true, chunkMessageSize, _chunkStreamContext));

// Act
await _sut.Handle(_event, default);

// Assert
_acknowledgementHandler.Received(1).Handle(_clientSessionContext, chunkMessageSize);
}

[Theory]
[InlineData(true)]
[InlineData(false)]
Expand Down

0 comments on commit d49361e

Please sign in to comment.