diff --git a/LiveStreamingServerNet.sln b/LiveStreamingServerNet.sln index 262219b3..bb1f9425 100644 --- a/LiveStreamingServerNet.sln +++ b/LiveStreamingServerNet.sln @@ -83,7 +83,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "LiveStreamingServerNet.Rtmp EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "LiveStreamingServerNet.Rtmp.Test", "test\LiveStreamingServerNet.Rtmp.Test\LiveStreamingServerNet.Rtmp.Test.csproj", "{772B6681-727C-469B-82B5-2760451F4E0F}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LiveStreamingServerNet.RtmpClientPlayDemo", "samples\LiveStreamingServerNet.RtmpClientPlayDemo\LiveStreamingServerNet.RtmpClientPlayDemo.csproj", "{17841723-77DF-4A42-9C24-38EB71203D39}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "LiveStreamingServerNet.RtmpClientPlayDemo", "samples\LiveStreamingServerNet.RtmpClientPlayDemo\LiveStreamingServerNet.RtmpClientPlayDemo.csproj", "{17841723-77DF-4A42-9C24-38EB71203D39}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LiveStreamingServerNet.RtmpClientPublishDemo", "samples\LiveStreamingServerNet.RtmpClientPublishDemo\LiveStreamingServerNet.RtmpClientPublishDemo.csproj", "{6887F6DF-CE3F-4D6E-8E66-6A8151CA2680}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -247,6 +249,10 @@ Global {17841723-77DF-4A42-9C24-38EB71203D39}.Debug|Any CPU.Build.0 = Debug|Any CPU {17841723-77DF-4A42-9C24-38EB71203D39}.Release|Any CPU.ActiveCfg = Release|Any CPU {17841723-77DF-4A42-9C24-38EB71203D39}.Release|Any CPU.Build.0 = Release|Any CPU + {6887F6DF-CE3F-4D6E-8E66-6A8151CA2680}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {6887F6DF-CE3F-4D6E-8E66-6A8151CA2680}.Debug|Any CPU.Build.0 = Debug|Any CPU + {6887F6DF-CE3F-4D6E-8E66-6A8151CA2680}.Release|Any CPU.ActiveCfg = Release|Any CPU + {6887F6DF-CE3F-4D6E-8E66-6A8151CA2680}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -272,6 +278,7 @@ Global {6584699E-532D-49EC-B521-B2151587588F} = {C85AC04A-5060-4C4B-830A-D7F51F602749} {772B6681-727C-469B-82B5-2760451F4E0F} = {C85AC04A-5060-4C4B-830A-D7F51F602749} {17841723-77DF-4A42-9C24-38EB71203D39} = {45761E33-F086-403D-BB6E-8059F8CC6BA8} + {6887F6DF-CE3F-4D6E-8E66-6A8151CA2680} = {45761E33-F086-403D-BB6E-8059F8CC6BA8} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {0A9A3928-2852-49B9-85EF-F00480E9FE7F} diff --git a/samples/LiveStreamingServerNet.RtmpClientPublishDemo/FlvReader.cs b/samples/LiveStreamingServerNet.RtmpClientPublishDemo/FlvReader.cs new file mode 100644 index 00000000..73c4b835 --- /dev/null +++ b/samples/LiveStreamingServerNet.RtmpClientPublishDemo/FlvReader.cs @@ -0,0 +1,114 @@ +using LiveStreamingServerNet.Utilities.Buffers; +using LiveStreamingServerNet.Utilities.Buffers.Contracts; + +namespace LiveStreamingServerNet.RtmpClientPublishDemo +{ + public class FlvReader : IDisposable + { + private readonly Stream _stream; + private readonly IDataBuffer _buffer; + + public FlvReader(Stream stream) + { + _stream = stream ?? throw new ArgumentNullException(nameof(stream)); + _buffer = new DataBuffer(); + } + + public async ValueTask ReadHeaderAsync(CancellationToken cancellationToken = default) + { + try + { + await _buffer.FromStreamData(_stream, 9, cancellationToken); + + var signature = _buffer.ReadBytes(3); + if (signature[0] != 'F' || signature[1] != 'L' || signature[2] != 'V') + throw new InvalidDataException("Invalid FLV signature."); + + var version = _buffer.ReadByte(); + var flags = _buffer.ReadByte(); + var headerSize = _buffer.ReadInt32BigEndian(); + + var header = new FlvHeader(version, flags, headerSize); + + if (headerSize > 9) + await _buffer.FromStreamData(_stream, headerSize - 9, cancellationToken); + + await _buffer.FromStreamData(_stream, 4, cancellationToken); + var previousTagSize = _buffer.ReadUInt32BigEndian(); + + if (previousTagSize != 0) + throw new InvalidDataException("Invalid PreviousTagSize."); + + return header; + } + catch (EndOfStreamException) + { + return null; + } + } + + public async ValueTask ReadTagAsync(CancellationToken cancellationToken = default) + { + try + { + await _buffer.FromStreamData(_stream, FlvTagHeader.Size, cancellationToken); + + var tagHeader = FlvTagHeader.Read(_buffer); + + await _buffer.FromStreamData(_stream, (int)tagHeader.DataSize, cancellationToken); + var payload = _buffer.ToRentedBuffer(); + + await _buffer.FromStreamData(_stream, 4, cancellationToken); + var previousTagSize = _buffer.ReadUInt32BigEndian(); + + if (previousTagSize != (11 + tagHeader.DataSize)) + throw new InvalidDataException("Mismatch in PreviousTagSize."); + + return new FlvTag(tagHeader, payload); + } + catch (EndOfStreamException) + { + return null; + } + } + + public void Dispose() + { + _buffer.Dispose(); + } + } + + public enum FlvTagType : byte + { + Audio = 8, + Video = 9, + ScriptData = 18 + } + + public record struct FlvHeader(byte Version, byte Flags, int HeaderSize) + { + public bool HasAudio => (Flags & 0x04) != 0; + public bool HasVideo => (Flags & 0x01) != 0; + } + + public record struct FlvTagHeader(FlvTagType TagType, uint DataSize, uint Timestamp) + { + public const int Size = 11; + + public static FlvTagHeader Read(IDataBuffer dataBuffer) + { + var tagType = (FlvTagType)dataBuffer.ReadByte(); + var dataSize = dataBuffer.ReadUInt24BigEndian(); + + var timestampLower = dataBuffer.ReadUInt24BigEndian(); + var timestampExtended = dataBuffer.ReadByte(); + var timestamp = ((uint)timestampExtended << 24) | timestampLower; + + dataBuffer.ReadUInt24BigEndian(); + + return new FlvTagHeader(tagType, dataSize, timestamp); + } + } + + public record FlvTag(FlvTagHeader Header, IRentedBuffer Payload); +} diff --git a/samples/LiveStreamingServerNet.RtmpClientPublishDemo/LiveStreamingServerNet.RtmpClientPublishDemo.csproj b/samples/LiveStreamingServerNet.RtmpClientPublishDemo/LiveStreamingServerNet.RtmpClientPublishDemo.csproj new file mode 100644 index 00000000..c383ed1c --- /dev/null +++ b/samples/LiveStreamingServerNet.RtmpClientPublishDemo/LiveStreamingServerNet.RtmpClientPublishDemo.csproj @@ -0,0 +1,20 @@ + + + + Exe + net8.0 + enable + enable + false + false + + + + + + + + + + + diff --git a/samples/LiveStreamingServerNet.RtmpClientPublishDemo/Program.cs b/samples/LiveStreamingServerNet.RtmpClientPublishDemo/Program.cs new file mode 100644 index 00000000..eac9d2d3 --- /dev/null +++ b/samples/LiveStreamingServerNet.RtmpClientPublishDemo/Program.cs @@ -0,0 +1,100 @@ +using LiveStreamingServerNet.Rtmp.Client; +using LiveStreamingServerNet.Rtmp.Client.Contracts; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using System.Diagnostics; + +namespace LiveStreamingServerNet.RtmpClientPublishDemo +{ + public class Program + { + /// + /// Connect to rtmp://127.0.0.1/live/demo and publish the stream from input.flv. + /// + public static async Task Main() + { + var rtmpUrl = "rtmp://127.0.0.1/live/demo"; + var parsedRtmpUrl = await RtmpUrlParser.ParseAsync(rtmpUrl); + + var serverEndPoint = parsedRtmpUrl.ServerEndPoint; + var information = new Dictionary { ["tcUrl"] = parsedRtmpUrl.TcUrl }; + var appName = parsedRtmpUrl.AppName; + var streamName = parsedRtmpUrl.StreamName; + + var rtmpClient = RtmpClientBuilder.Create() + .ConfigureLogging(options => options.AddConsole().SetMinimumLevel(LogLevel.Trace)) + .Build(); + + var logger = rtmpClient.Services.GetRequiredService>(); + + await rtmpClient.ConnectAsync(serverEndPoint, appName, information); + + var rtmpStream = await rtmpClient.CreateStreamAsync(); + + await PublishStreamFromFlvAsync(streamName, rtmpClient, rtmpStream, logger); + } + + private static async Task PublishStreamFromFlvAsync(string streamName, IRtmpClient rtmpClient, IRtmpStream rtmpStream, ILogger logger) + { + using var fileStream = new FileStream("input.flv", FileMode.Open, FileAccess.Read, FileShare.Read, 512 * 1024); + using var flvReader = new FlvReader(fileStream); + + rtmpStream.OnUserControlEventReceived += (sender, e) => + logger.LogInformation($"User control event received: {e.EventType}"); + + rtmpStream.OnStatusReceived += (sender, e) => + logger.LogInformation($"Status received: {e.Code}"); + + rtmpStream.Publish.Publish(streamName); + + await Task.WhenAny(SendMediaDataAsync(rtmpStream, flvReader), rtmpClient.UntilStoppedAsync()); + + async Task SendMediaDataAsync(IRtmpStream stream, FlvReader flvReader) + { + var header = await flvReader.ReadHeaderAsync(); + + if (header == null) + return; + + (uint Timestamp, DateTime Time)? start = null; + + while (true) + { + var tag = await flvReader.ReadTagAsync(); + + if (tag == null) + return; + + try + { + if (tag.Header.TagType == FlvTagType.Audio) + { + await stream.Publish.SendAudioDataAsync(tag.Payload, tag.Header.Timestamp); + } + else if (tag.Header.TagType == FlvTagType.Video) + { + await stream.Publish.SendVideoDataAsync(tag.Payload, tag.Header.Timestamp); + } + + if (!start.HasValue) + { + start = (tag.Header.Timestamp, DateTime.UtcNow); + } + + var intendedTime = tag.Header.Timestamp - start.Value.Timestamp; + var elapsedTime = (DateTime.UtcNow - start.Value.Time).TotalMilliseconds; + + var delay = intendedTime - elapsedTime; + + if (delay > 0) + await Task.Delay((int)delay); + } + finally + { + tag.Payload.Unclaim(); + } + } + } + } + } +}