Skip to content

Commit

Permalink
feat: add RtmpClient publish demo
Browse files Browse the repository at this point in the history
  • Loading branch information
josephnhtam committed Oct 1, 2024
1 parent a58f6ee commit 0f2db0d
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 1 deletion.
9 changes: 8 additions & 1 deletion LiveStreamingServerNet.sln
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "LiveStreamingServerNet.Rtmp
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "LiveStreamingServerNet.Rtmp.Test", "test\LiveStreamingServerNet.Rtmp.Test\LiveStreamingServerNet.Rtmp.Test.csproj", "{772B6681-727C-469B-82B5-2760451F4E0F}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LiveStreamingServerNet.RtmpClientPlayDemo", "samples\LiveStreamingServerNet.RtmpClientPlayDemo\LiveStreamingServerNet.RtmpClientPlayDemo.csproj", "{17841723-77DF-4A42-9C24-38EB71203D39}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "LiveStreamingServerNet.RtmpClientPlayDemo", "samples\LiveStreamingServerNet.RtmpClientPlayDemo\LiveStreamingServerNet.RtmpClientPlayDemo.csproj", "{17841723-77DF-4A42-9C24-38EB71203D39}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LiveStreamingServerNet.RtmpClientPublishDemo", "samples\LiveStreamingServerNet.RtmpClientPublishDemo\LiveStreamingServerNet.RtmpClientPublishDemo.csproj", "{6887F6DF-CE3F-4D6E-8E66-6A8151CA2680}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -247,6 +249,10 @@ Global
{17841723-77DF-4A42-9C24-38EB71203D39}.Debug|Any CPU.Build.0 = Debug|Any CPU
{17841723-77DF-4A42-9C24-38EB71203D39}.Release|Any CPU.ActiveCfg = Release|Any CPU
{17841723-77DF-4A42-9C24-38EB71203D39}.Release|Any CPU.Build.0 = Release|Any CPU
{6887F6DF-CE3F-4D6E-8E66-6A8151CA2680}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6887F6DF-CE3F-4D6E-8E66-6A8151CA2680}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6887F6DF-CE3F-4D6E-8E66-6A8151CA2680}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6887F6DF-CE3F-4D6E-8E66-6A8151CA2680}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -272,6 +278,7 @@ Global
{6584699E-532D-49EC-B521-B2151587588F} = {C85AC04A-5060-4C4B-830A-D7F51F602749}
{772B6681-727C-469B-82B5-2760451F4E0F} = {C85AC04A-5060-4C4B-830A-D7F51F602749}
{17841723-77DF-4A42-9C24-38EB71203D39} = {45761E33-F086-403D-BB6E-8059F8CC6BA8}
{6887F6DF-CE3F-4D6E-8E66-6A8151CA2680} = {45761E33-F086-403D-BB6E-8059F8CC6BA8}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {0A9A3928-2852-49B9-85EF-F00480E9FE7F}
Expand Down
114 changes: 114 additions & 0 deletions samples/LiveStreamingServerNet.RtmpClientPublishDemo/FlvReader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
using LiveStreamingServerNet.Utilities.Buffers;
using LiveStreamingServerNet.Utilities.Buffers.Contracts;

namespace LiveStreamingServerNet.RtmpClientPublishDemo
{
public class FlvReader : IDisposable
{
private readonly Stream _stream;
private readonly IDataBuffer _buffer;

public FlvReader(Stream stream)
{
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
_buffer = new DataBuffer();
}

public async ValueTask<FlvHeader?> ReadHeaderAsync(CancellationToken cancellationToken = default)
{
try
{
await _buffer.FromStreamData(_stream, 9, cancellationToken);

var signature = _buffer.ReadBytes(3);
if (signature[0] != 'F' || signature[1] != 'L' || signature[2] != 'V')
throw new InvalidDataException("Invalid FLV signature.");

var version = _buffer.ReadByte();
var flags = _buffer.ReadByte();
var headerSize = _buffer.ReadInt32BigEndian();

var header = new FlvHeader(version, flags, headerSize);

if (headerSize > 9)
await _buffer.FromStreamData(_stream, headerSize - 9, cancellationToken);

await _buffer.FromStreamData(_stream, 4, cancellationToken);
var previousTagSize = _buffer.ReadUInt32BigEndian();

if (previousTagSize != 0)
throw new InvalidDataException("Invalid PreviousTagSize.");

return header;
}
catch (EndOfStreamException)
{
return null;
}
}

public async ValueTask<FlvTag?> ReadTagAsync(CancellationToken cancellationToken = default)
{
try
{
await _buffer.FromStreamData(_stream, FlvTagHeader.Size, cancellationToken);

var tagHeader = FlvTagHeader.Read(_buffer);

await _buffer.FromStreamData(_stream, (int)tagHeader.DataSize, cancellationToken);
var payload = _buffer.ToRentedBuffer();

await _buffer.FromStreamData(_stream, 4, cancellationToken);
var previousTagSize = _buffer.ReadUInt32BigEndian();

if (previousTagSize != (11 + tagHeader.DataSize))
throw new InvalidDataException("Mismatch in PreviousTagSize.");

return new FlvTag(tagHeader, payload);
}
catch (EndOfStreamException)
{
return null;
}
}

public void Dispose()
{
_buffer.Dispose();
}
}

public enum FlvTagType : byte
{
Audio = 8,
Video = 9,
ScriptData = 18
}

public record struct FlvHeader(byte Version, byte Flags, int HeaderSize)
{
public bool HasAudio => (Flags & 0x04) != 0;
public bool HasVideo => (Flags & 0x01) != 0;
}

public record struct FlvTagHeader(FlvTagType TagType, uint DataSize, uint Timestamp)
{
public const int Size = 11;

public static FlvTagHeader Read(IDataBuffer dataBuffer)
{
var tagType = (FlvTagType)dataBuffer.ReadByte();
var dataSize = dataBuffer.ReadUInt24BigEndian();

var timestampLower = dataBuffer.ReadUInt24BigEndian();
var timestampExtended = dataBuffer.ReadByte();
var timestamp = ((uint)timestampExtended << 24) | timestampLower;

dataBuffer.ReadUInt24BigEndian();

return new FlvTagHeader(tagType, dataSize, timestamp);
}
}

public record FlvTag(FlvTagHeader Header, IRentedBuffer Payload);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<IsPackable>false</IsPackable>
<ServerGarbageCollection>false</ServerGarbageCollection>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\LiveStreamingServerNet.Rtmp.Client\LiveStreamingServerNet.Rtmp.Client.csproj" />
</ItemGroup>

</Project>
100 changes: 100 additions & 0 deletions samples/LiveStreamingServerNet.RtmpClientPublishDemo/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
using LiveStreamingServerNet.Rtmp.Client;
using LiveStreamingServerNet.Rtmp.Client.Contracts;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System.Diagnostics;

namespace LiveStreamingServerNet.RtmpClientPublishDemo
{
public class Program
{
/// <summary>
/// Connect to rtmp://127.0.0.1/live/demo and publish the stream from input.flv.
/// </summary>
public static async Task Main()
{
var rtmpUrl = "rtmp://127.0.0.1/live/demo";
var parsedRtmpUrl = await RtmpUrlParser.ParseAsync(rtmpUrl);

var serverEndPoint = parsedRtmpUrl.ServerEndPoint;
var information = new Dictionary<string, object> { ["tcUrl"] = parsedRtmpUrl.TcUrl };
var appName = parsedRtmpUrl.AppName;
var streamName = parsedRtmpUrl.StreamName;

var rtmpClient = RtmpClientBuilder.Create()
.ConfigureLogging(options => options.AddConsole().SetMinimumLevel(LogLevel.Trace))
.Build();

var logger = rtmpClient.Services.GetRequiredService<ILogger<Program>>();

await rtmpClient.ConnectAsync(serverEndPoint, appName, information);

var rtmpStream = await rtmpClient.CreateStreamAsync();

await PublishStreamFromFlvAsync(streamName, rtmpClient, rtmpStream, logger);
}

private static async Task PublishStreamFromFlvAsync(string streamName, IRtmpClient rtmpClient, IRtmpStream rtmpStream, ILogger<Program> logger)
{
using var fileStream = new FileStream("input.flv", FileMode.Open, FileAccess.Read, FileShare.Read, 512 * 1024);
using var flvReader = new FlvReader(fileStream);

rtmpStream.OnUserControlEventReceived += (sender, e) =>
logger.LogInformation($"User control event received: {e.EventType}");

rtmpStream.OnStatusReceived += (sender, e) =>
logger.LogInformation($"Status received: {e.Code}");

rtmpStream.Publish.Publish(streamName);

await Task.WhenAny(SendMediaDataAsync(rtmpStream, flvReader), rtmpClient.UntilStoppedAsync());

async Task SendMediaDataAsync(IRtmpStream stream, FlvReader flvReader)
{
var header = await flvReader.ReadHeaderAsync();

if (header == null)
return;

(uint Timestamp, DateTime Time)? start = null;

while (true)
{
var tag = await flvReader.ReadTagAsync();

if (tag == null)
return;

try
{
if (tag.Header.TagType == FlvTagType.Audio)
{
await stream.Publish.SendAudioDataAsync(tag.Payload, tag.Header.Timestamp);
}
else if (tag.Header.TagType == FlvTagType.Video)
{
await stream.Publish.SendVideoDataAsync(tag.Payload, tag.Header.Timestamp);
}

if (!start.HasValue)
{
start = (tag.Header.Timestamp, DateTime.UtcNow);
}

var intendedTime = tag.Header.Timestamp - start.Value.Timestamp;
var elapsedTime = (DateTime.UtcNow - start.Value.Time).TotalMilliseconds;

var delay = intendedTime - elapsedTime;

if (delay > 0)
await Task.Delay((int)delay);
}
finally
{
tag.Payload.Unclaim();
}
}
}
}
}
}

0 comments on commit 0f2db0d

Please sign in to comment.