diff --git a/src/LiveStreamingServerNet.Rtmp.Client/Contracts/IRtmpStream.cs b/src/LiveStreamingServerNet.Rtmp.Client/Contracts/IRtmpStream.cs index 74cba9d0..a01e405f 100644 --- a/src/LiveStreamingServerNet.Rtmp.Client/Contracts/IRtmpStream.cs +++ b/src/LiveStreamingServerNet.Rtmp.Client/Contracts/IRtmpStream.cs @@ -1,4 +1,6 @@ -namespace LiveStreamingServerNet.Rtmp.Client.Contracts +using LiveStreamingServerNet.Utilities.Buffers.Contracts; + +namespace LiveStreamingServerNet.Rtmp.Client.Contracts { public interface IRtmpStream { @@ -18,5 +20,7 @@ public interface IRtmpSubscribeStream void Play(string streamName, double start, double duration, bool reset); event EventHandler> OnStreamMetaDataUpdated; + event EventHandler OnVideoDataReceived; + event EventHandler OnAudioDataReceived; } } diff --git a/src/LiveStreamingServerNet.Rtmp.Client/Internal/Contracts/IRtmpStreamContext.cs b/src/LiveStreamingServerNet.Rtmp.Client/Internal/Contracts/IRtmpStreamContext.cs index 4095fc08..5b6970c9 100644 --- a/src/LiveStreamingServerNet.Rtmp.Client/Internal/Contracts/IRtmpStreamContext.cs +++ b/src/LiveStreamingServerNet.Rtmp.Client/Internal/Contracts/IRtmpStreamContext.cs @@ -1,4 +1,6 @@ -namespace LiveStreamingServerNet.Rtmp.Client.Internal.Contracts +using LiveStreamingServerNet.Utilities.Buffers.Contracts; + +namespace LiveStreamingServerNet.Rtmp.Client.Internal.Contracts { internal interface IRtmpStreamContext : IDisposable { @@ -33,7 +35,11 @@ internal interface IRtmpPublishStreamContext : IRtmpMediaStreamContext internal interface IRtmpSubscribeStreamContext : IRtmpMediaStreamContext { IReadOnlyDictionary? StreamMetaData { get; set; } + void ReceiveVideoData(IRentedBuffer rentedBuffer); + void ReceiveAudioData(IRentedBuffer rentedBuffer); event EventHandler> OnStreamMetaDataUpdated; + event EventHandler OnVideoDataReceived; + event EventHandler OnAudioDataReceived; } } diff --git a/src/LiveStreamingServerNet.Rtmp.Client/Internal/Logging/LoggerExtensions.cs b/src/LiveStreamingServerNet.Rtmp.Client/Internal/Logging/LoggerExtensions.cs index 1f8e35d1..b02f3938 100644 --- a/src/LiveStreamingServerNet.Rtmp.Client/Internal/Logging/LoggerExtensions.cs +++ b/src/LiveStreamingServerNet.Rtmp.Client/Internal/Logging/LoggerExtensions.cs @@ -39,5 +39,8 @@ internal static partial class LoggerExtensions [LoggerMessage(LogLevel.Error, "SessionId: {SessionId} | An error occurred while handling command result")] public static partial void CommandResultHandlingError(this ILogger logger, uint sessionId, Exception ex); + + [LoggerMessage(LogLevel.Error, "SessionId: {SessionId} | StreamId: {StreamId} | Subscribe stream not yet created")] + public static partial void SubscribeStreamNotYetCreated(this ILogger logger, uint sessionId, uint streamId); } } \ No newline at end of file diff --git a/src/LiveStreamingServerNet.Rtmp.Client/Internal/RtmpEventHandlers/Media/RtmpAudioMessageHandler.cs b/src/LiveStreamingServerNet.Rtmp.Client/Internal/RtmpEventHandlers/Media/RtmpAudioMessageHandler.cs new file mode 100644 index 00000000..d0b60d4b --- /dev/null +++ b/src/LiveStreamingServerNet.Rtmp.Client/Internal/RtmpEventHandlers/Media/RtmpAudioMessageHandler.cs @@ -0,0 +1,52 @@ +using LiveStreamingServerNet.Rtmp.Client.Internal.Contracts; +using LiveStreamingServerNet.Rtmp.Client.Internal.Logging; +using LiveStreamingServerNet.Rtmp.Internal.Contracts; +using LiveStreamingServerNet.Rtmp.Internal.RtmpEventHandlers; +using LiveStreamingServerNet.Rtmp.Internal.RtmpEventHandlers.Dispatcher.Attributes; +using LiveStreamingServerNet.Rtmp.Internal.RtmpEventHandlers.Dispatcher.Contracts; +using LiveStreamingServerNet.Utilities.Buffers.Contracts; +using Microsoft.Extensions.Logging; + +namespace LiveStreamingServerNet.Rtmp.Client.Internal.RtmpEventHandlers.Media +{ + [RtmpMessageType(RtmpMessageType.AudioMessage)] + internal class RtmpAudioMessageHandler : IRtmpMessageHandler + { + private readonly ILogger _logger; + + public RtmpAudioMessageHandler( + ILogger logger) + { + _logger = logger; + } + + public ValueTask HandleAsync( + IRtmpChunkStreamContext chunkStreamContext, + IRtmpSessionContext context, + IDataBuffer payloadBuffer, + CancellationToken cancellationToken) + { + var streamId = chunkStreamContext.MessageHeader.MessageStreamId; + var subscribeStreamContext = context.GetStreamContext(streamId)?.SubscribeContext; + + if (subscribeStreamContext == null) + { + _logger.SubscribeStreamNotYetCreated(context.Session.Id, streamId); + return ValueTask.FromResult(true); + } + + var rentedBuffer = payloadBuffer.ToRentedBuffer(); + + try + { + subscribeStreamContext.ReceiveAudioData(rentedBuffer); + + return ValueTask.FromResult(true); + } + finally + { + rentedBuffer.Unclaim(); + } + } + } +} diff --git a/src/LiveStreamingServerNet.Rtmp.Client/Internal/RtmpEventHandlers/Media/RtmpVideoMessageHandler.cs b/src/LiveStreamingServerNet.Rtmp.Client/Internal/RtmpEventHandlers/Media/RtmpVideoMessageHandler.cs new file mode 100644 index 00000000..9b746d84 --- /dev/null +++ b/src/LiveStreamingServerNet.Rtmp.Client/Internal/RtmpEventHandlers/Media/RtmpVideoMessageHandler.cs @@ -0,0 +1,52 @@ +using LiveStreamingServerNet.Rtmp.Client.Internal.Contracts; +using LiveStreamingServerNet.Rtmp.Client.Internal.Logging; +using LiveStreamingServerNet.Rtmp.Internal.Contracts; +using LiveStreamingServerNet.Rtmp.Internal.RtmpEventHandlers; +using LiveStreamingServerNet.Rtmp.Internal.RtmpEventHandlers.Dispatcher.Attributes; +using LiveStreamingServerNet.Rtmp.Internal.RtmpEventHandlers.Dispatcher.Contracts; +using LiveStreamingServerNet.Utilities.Buffers.Contracts; +using Microsoft.Extensions.Logging; + +namespace LiveStreamingServerNet.Rtmp.Client.Internal.RtmpEventHandlers.Media +{ + [RtmpMessageType(RtmpMessageType.VideoMessage)] + internal class RtmpVideoMessageHandler : IRtmpMessageHandler + { + private readonly ILogger _logger; + + public RtmpVideoMessageHandler( + ILogger logger) + { + _logger = logger; + } + + public ValueTask HandleAsync( + IRtmpChunkStreamContext chunkStreamContext, + IRtmpSessionContext context, + IDataBuffer payloadBuffer, + CancellationToken cancellationToken) + { + var streamId = chunkStreamContext.MessageHeader.MessageStreamId; + var subscribeStreamContext = context.GetStreamContext(streamId)?.SubscribeContext; + + if (subscribeStreamContext == null) + { + _logger.SubscribeStreamNotYetCreated(context.Session.Id, streamId); + return ValueTask.FromResult(true); + } + + var rentedBuffer = payloadBuffer.ToRentedBuffer(); + + try + { + subscribeStreamContext.ReceiveVideoData(rentedBuffer); + + return ValueTask.FromResult(true); + } + finally + { + rentedBuffer.Unclaim(); + } + } + } +} diff --git a/src/LiveStreamingServerNet.Rtmp.Client/Internal/RtmpStream.cs b/src/LiveStreamingServerNet.Rtmp.Client/Internal/RtmpStream.cs index 7d46dbe8..39592a6c 100644 --- a/src/LiveStreamingServerNet.Rtmp.Client/Internal/RtmpStream.cs +++ b/src/LiveStreamingServerNet.Rtmp.Client/Internal/RtmpStream.cs @@ -1,6 +1,7 @@ using LiveStreamingServerNet.Rtmp.Client.Contracts; using LiveStreamingServerNet.Rtmp.Client.Internal.Contracts; using LiveStreamingServerNet.Rtmp.Client.Internal.Services.Contracts; +using LiveStreamingServerNet.Utilities.Buffers.Contracts; using Microsoft.Extensions.Logging; namespace LiveStreamingServerNet.Rtmp.Client.Internal @@ -51,8 +52,11 @@ internal class RtmpSubscribeStream : IRtmpSubscribeStream private readonly IRtmpChunkMessageSenderService _chunkMessageSender; private readonly IRtmpCommanderService _commander; private readonly ILogger _logger; + public IReadOnlyDictionary? StreamMetaData { get; set; } public event EventHandler>? OnStreamMetaDataUpdated; + public event EventHandler? OnVideoDataReceived; + public event EventHandler? OnAudioDataReceived; public RtmpSubscribeStream( IRtmpStreamContext streamContext, @@ -82,11 +86,15 @@ public void Play(string streamName, double start, double duration, bool reset) private void OnSubscribeContextCreated(object? sender, IRtmpSubscribeStreamContext subscribeStreamContext) { subscribeStreamContext.OnStreamMetaDataUpdated += OnStreamContextMetaDataUpdated; + subscribeStreamContext.OnVideoDataReceived += OnStreamContextVideoDataReceived; + subscribeStreamContext.OnAudioDataReceived += OnStreamContextAudioDataReceived; } private void OnSubscribeContextRemoved(object? sender, IRtmpSubscribeStreamContext subscribeStreamContext) { subscribeStreamContext.OnStreamMetaDataUpdated -= OnStreamContextMetaDataUpdated; + subscribeStreamContext.OnVideoDataReceived -= OnStreamContextVideoDataReceived; + subscribeStreamContext.OnAudioDataReceived -= OnStreamContextAudioDataReceived; } private void OnStreamContextMetaDataUpdated(object? sender, IReadOnlyDictionary streamMetaData) @@ -94,6 +102,16 @@ private void OnStreamContextMetaDataUpdated(object? sender, IReadOnlyDictionary< StreamMetaData = streamMetaData; OnStreamMetaDataUpdated?.Invoke(this, streamMetaData); } + + private void OnStreamContextVideoDataReceived(object? sender, IRentedBuffer rentedBuffer) + { + OnVideoDataReceived?.Invoke(this, rentedBuffer); + } + + private void OnStreamContextAudioDataReceived(object? sender, IRentedBuffer rentedBuffer) + { + OnAudioDataReceived?.Invoke(this, rentedBuffer); + } } } -} +} \ No newline at end of file diff --git a/src/LiveStreamingServerNet.Rtmp.Client/Internal/RtmpStreamContext.cs b/src/LiveStreamingServerNet.Rtmp.Client/Internal/RtmpStreamContext.cs index d0a630de..622e70e7 100644 --- a/src/LiveStreamingServerNet.Rtmp.Client/Internal/RtmpStreamContext.cs +++ b/src/LiveStreamingServerNet.Rtmp.Client/Internal/RtmpStreamContext.cs @@ -1,4 +1,5 @@ using LiveStreamingServerNet.Rtmp.Client.Internal.Contracts; +using LiveStreamingServerNet.Utilities.Buffers.Contracts; namespace LiveStreamingServerNet.Rtmp.Client.Internal { @@ -91,14 +92,18 @@ public virtual void Dispose() { } internal class RtmpPublishStreamContext : RtmpMediaStreamContext, IRtmpPublishStreamContext { - public RtmpPublishStreamContext(IRtmpStreamContext streamContext) : base(streamContext) - { } + public RtmpPublishStreamContext(IRtmpStreamContext streamContext) : base(streamContext) { } } internal class RtmpSubscribeStreamContext : RtmpMediaStreamContext, IRtmpSubscribeStreamContext { private IReadOnlyDictionary? _streamMetaData; + public event EventHandler>? OnStreamMetaDataUpdated; + public event EventHandler? OnVideoDataReceived; + public event EventHandler? OnAudioDataReceived; + + public RtmpSubscribeStreamContext(IRtmpStreamContext streamContext) : base(streamContext) { } public IReadOnlyDictionary? StreamMetaData { @@ -114,7 +119,14 @@ public IReadOnlyDictionary? StreamMetaData } } - public RtmpSubscribeStreamContext(IRtmpStreamContext streamContext) : base(streamContext) - { } + public void ReceiveVideoData(IRentedBuffer rentedBuffer) + { + OnVideoDataReceived?.Invoke(this, rentedBuffer); + } + + public void ReceiveAudioData(IRentedBuffer rentedBuffer) + { + OnAudioDataReceived?.Invoke(this, rentedBuffer); + } } }