Skip to content

Commit

Permalink
feat: add video/audio data receiving
Browse files Browse the repository at this point in the history
  • Loading branch information
josephnhtam committed Sep 29, 2024
1 parent 015d3d0 commit ffcbf37
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace LiveStreamingServerNet.Rtmp.Client.Contracts
using LiveStreamingServerNet.Utilities.Buffers.Contracts;

namespace LiveStreamingServerNet.Rtmp.Client.Contracts
{
public interface IRtmpStream
{
Expand All @@ -18,5 +20,7 @@ public interface IRtmpSubscribeStream
void Play(string streamName, double start, double duration, bool reset);

event EventHandler<IReadOnlyDictionary<string, object>> OnStreamMetaDataUpdated;
event EventHandler<IRentedBuffer> OnVideoDataReceived;
event EventHandler<IRentedBuffer> OnAudioDataReceived;
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace LiveStreamingServerNet.Rtmp.Client.Internal.Contracts
using LiveStreamingServerNet.Utilities.Buffers.Contracts;

namespace LiveStreamingServerNet.Rtmp.Client.Internal.Contracts
{
internal interface IRtmpStreamContext : IDisposable
{
Expand Down Expand Up @@ -33,7 +35,11 @@ internal interface IRtmpPublishStreamContext : IRtmpMediaStreamContext
internal interface IRtmpSubscribeStreamContext : IRtmpMediaStreamContext
{
IReadOnlyDictionary<string, object>? StreamMetaData { get; set; }
void ReceiveVideoData(IRentedBuffer rentedBuffer);
void ReceiveAudioData(IRentedBuffer rentedBuffer);

event EventHandler<IReadOnlyDictionary<string, object>> OnStreamMetaDataUpdated;
event EventHandler<IRentedBuffer> OnVideoDataReceived;
event EventHandler<IRentedBuffer> OnAudioDataReceived;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,8 @@ internal static partial class LoggerExtensions

[LoggerMessage(LogLevel.Error, "SessionId: {SessionId} | An error occurred while handling command result")]
public static partial void CommandResultHandlingError(this ILogger logger, uint sessionId, Exception ex);

[LoggerMessage(LogLevel.Error, "SessionId: {SessionId} | StreamId: {StreamId} | Subscribe stream not yet created")]
public static partial void SubscribeStreamNotYetCreated(this ILogger logger, uint sessionId, uint streamId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using LiveStreamingServerNet.Rtmp.Client.Internal.Contracts;
using LiveStreamingServerNet.Rtmp.Client.Internal.Logging;
using LiveStreamingServerNet.Rtmp.Internal.Contracts;
using LiveStreamingServerNet.Rtmp.Internal.RtmpEventHandlers;
using LiveStreamingServerNet.Rtmp.Internal.RtmpEventHandlers.Dispatcher.Attributes;
using LiveStreamingServerNet.Rtmp.Internal.RtmpEventHandlers.Dispatcher.Contracts;
using LiveStreamingServerNet.Utilities.Buffers.Contracts;
using Microsoft.Extensions.Logging;

namespace LiveStreamingServerNet.Rtmp.Client.Internal.RtmpEventHandlers.Media
{
[RtmpMessageType(RtmpMessageType.AudioMessage)]
internal class RtmpAudioMessageHandler : IRtmpMessageHandler<IRtmpSessionContext>
{
private readonly ILogger _logger;

public RtmpAudioMessageHandler(
ILogger<RtmpAudioMessageHandler> logger)
{
_logger = logger;
}

public ValueTask<bool> HandleAsync(
IRtmpChunkStreamContext chunkStreamContext,
IRtmpSessionContext context,
IDataBuffer payloadBuffer,
CancellationToken cancellationToken)
{
var streamId = chunkStreamContext.MessageHeader.MessageStreamId;
var subscribeStreamContext = context.GetStreamContext(streamId)?.SubscribeContext;

if (subscribeStreamContext == null)
{
_logger.SubscribeStreamNotYetCreated(context.Session.Id, streamId);
return ValueTask.FromResult(true);
}

var rentedBuffer = payloadBuffer.ToRentedBuffer();

try
{
subscribeStreamContext.ReceiveAudioData(rentedBuffer);

return ValueTask.FromResult(true);
}
finally
{
rentedBuffer.Unclaim();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using LiveStreamingServerNet.Rtmp.Client.Internal.Contracts;
using LiveStreamingServerNet.Rtmp.Client.Internal.Logging;
using LiveStreamingServerNet.Rtmp.Internal.Contracts;
using LiveStreamingServerNet.Rtmp.Internal.RtmpEventHandlers;
using LiveStreamingServerNet.Rtmp.Internal.RtmpEventHandlers.Dispatcher.Attributes;
using LiveStreamingServerNet.Rtmp.Internal.RtmpEventHandlers.Dispatcher.Contracts;
using LiveStreamingServerNet.Utilities.Buffers.Contracts;
using Microsoft.Extensions.Logging;

namespace LiveStreamingServerNet.Rtmp.Client.Internal.RtmpEventHandlers.Media
{
[RtmpMessageType(RtmpMessageType.VideoMessage)]
internal class RtmpVideoMessageHandler : IRtmpMessageHandler<IRtmpSessionContext>
{
private readonly ILogger _logger;

public RtmpVideoMessageHandler(
ILogger<RtmpVideoMessageHandler> logger)
{
_logger = logger;
}

public ValueTask<bool> HandleAsync(
IRtmpChunkStreamContext chunkStreamContext,
IRtmpSessionContext context,
IDataBuffer payloadBuffer,
CancellationToken cancellationToken)
{
var streamId = chunkStreamContext.MessageHeader.MessageStreamId;
var subscribeStreamContext = context.GetStreamContext(streamId)?.SubscribeContext;

if (subscribeStreamContext == null)
{
_logger.SubscribeStreamNotYetCreated(context.Session.Id, streamId);
return ValueTask.FromResult(true);
}

var rentedBuffer = payloadBuffer.ToRentedBuffer();

try
{
subscribeStreamContext.ReceiveVideoData(rentedBuffer);

return ValueTask.FromResult(true);
}
finally
{
rentedBuffer.Unclaim();
}
}
}
}
20 changes: 19 additions & 1 deletion src/LiveStreamingServerNet.Rtmp.Client/Internal/RtmpStream.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using LiveStreamingServerNet.Rtmp.Client.Contracts;
using LiveStreamingServerNet.Rtmp.Client.Internal.Contracts;
using LiveStreamingServerNet.Rtmp.Client.Internal.Services.Contracts;
using LiveStreamingServerNet.Utilities.Buffers.Contracts;
using Microsoft.Extensions.Logging;

namespace LiveStreamingServerNet.Rtmp.Client.Internal
Expand Down Expand Up @@ -51,8 +52,11 @@ internal class RtmpSubscribeStream : IRtmpSubscribeStream
private readonly IRtmpChunkMessageSenderService _chunkMessageSender;
private readonly IRtmpCommanderService _commander;
private readonly ILogger _logger;

public IReadOnlyDictionary<string, object>? StreamMetaData { get; set; }
public event EventHandler<IReadOnlyDictionary<string, object>>? OnStreamMetaDataUpdated;
public event EventHandler<IRentedBuffer>? OnVideoDataReceived;
public event EventHandler<IRentedBuffer>? OnAudioDataReceived;

public RtmpSubscribeStream(
IRtmpStreamContext streamContext,
Expand Down Expand Up @@ -82,18 +86,32 @@ public void Play(string streamName, double start, double duration, bool reset)
private void OnSubscribeContextCreated(object? sender, IRtmpSubscribeStreamContext subscribeStreamContext)
{
subscribeStreamContext.OnStreamMetaDataUpdated += OnStreamContextMetaDataUpdated;
subscribeStreamContext.OnVideoDataReceived += OnStreamContextVideoDataReceived;
subscribeStreamContext.OnAudioDataReceived += OnStreamContextAudioDataReceived;
}

private void OnSubscribeContextRemoved(object? sender, IRtmpSubscribeStreamContext subscribeStreamContext)
{
subscribeStreamContext.OnStreamMetaDataUpdated -= OnStreamContextMetaDataUpdated;
subscribeStreamContext.OnVideoDataReceived -= OnStreamContextVideoDataReceived;
subscribeStreamContext.OnAudioDataReceived -= OnStreamContextAudioDataReceived;
}

private void OnStreamContextMetaDataUpdated(object? sender, IReadOnlyDictionary<string, object> streamMetaData)
{
StreamMetaData = streamMetaData;
OnStreamMetaDataUpdated?.Invoke(this, streamMetaData);
}

private void OnStreamContextVideoDataReceived(object? sender, IRentedBuffer rentedBuffer)
{
OnVideoDataReceived?.Invoke(this, rentedBuffer);
}

private void OnStreamContextAudioDataReceived(object? sender, IRentedBuffer rentedBuffer)
{
OnAudioDataReceived?.Invoke(this, rentedBuffer);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using LiveStreamingServerNet.Rtmp.Client.Internal.Contracts;
using LiveStreamingServerNet.Utilities.Buffers.Contracts;

namespace LiveStreamingServerNet.Rtmp.Client.Internal
{
Expand Down Expand Up @@ -91,14 +92,18 @@ public virtual void Dispose() { }

internal class RtmpPublishStreamContext : RtmpMediaStreamContext, IRtmpPublishStreamContext
{
public RtmpPublishStreamContext(IRtmpStreamContext streamContext) : base(streamContext)
{ }
public RtmpPublishStreamContext(IRtmpStreamContext streamContext) : base(streamContext) { }
}

internal class RtmpSubscribeStreamContext : RtmpMediaStreamContext, IRtmpSubscribeStreamContext
{
private IReadOnlyDictionary<string, object>? _streamMetaData;

public event EventHandler<IReadOnlyDictionary<string, object>>? OnStreamMetaDataUpdated;
public event EventHandler<IRentedBuffer>? OnVideoDataReceived;
public event EventHandler<IRentedBuffer>? OnAudioDataReceived;

public RtmpSubscribeStreamContext(IRtmpStreamContext streamContext) : base(streamContext) { }

public IReadOnlyDictionary<string, object>? StreamMetaData
{
Expand All @@ -114,7 +119,14 @@ public IReadOnlyDictionary<string, object>? StreamMetaData
}
}

public RtmpSubscribeStreamContext(IRtmpStreamContext streamContext) : base(streamContext)
{ }
public void ReceiveVideoData(IRentedBuffer rentedBuffer)
{
OnVideoDataReceived?.Invoke(this, rentedBuffer);
}

public void ReceiveAudioData(IRentedBuffer rentedBuffer)
{
OnAudioDataReceived?.Invoke(this, rentedBuffer);
}
}
}

0 comments on commit ffcbf37

Please sign in to comment.