Skip to content

Commit

Permalink
refactor: timestamp handling
Browse files Browse the repository at this point in the history
  • Loading branch information
josephnhtam committed Oct 4, 2024
1 parent 734f0ba commit 72f140a
Show file tree
Hide file tree
Showing 14 changed files with 70 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public ValueTask<bool> HandleAsync(

try
{
subscribeStreamContext.ReceiveAudioData(new(rentedBuffer, chunkStreamContext.MessageHeader.Timestamp));
subscribeStreamContext.ReceiveAudioData(new(rentedBuffer, chunkStreamContext.Timestamp));

return ValueTask.FromResult(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public ValueTask<bool> HandleAsync(

try
{
subscribeStreamContext.ReceiveVideoData(new(rentedBuffer, chunkStreamContext.MessageHeader.Timestamp));
subscribeStreamContext.ReceiveVideoData(new(rentedBuffer, chunkStreamContext.Timestamp));

return ValueTask.FromResult(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private void SendCachedStreamMessages(

_mediaMessageCacher.SendCachedStreamMetaDataMessage(
streamContext.SubscribeContext, publishStreamContext,
chunkStreamContext.MessageHeader.Timestamp);
chunkStreamContext.Timestamp);

_mediaMessageCacher.SendCachedHeaderMessages(
streamContext.SubscribeContext, publishStreamContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private void BroadcastMetaDataToSubscribers(
_mediaMessageCacher.SendCachedStreamMetaDataMessage(
subscribeStreamContexts,
publishStreamContext,
chunkStreamContext.MessageHeader.Timestamp);
chunkStreamContext.Timestamp);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ private async ValueTask BroadcastAudioMessageToSubscribersAsync(
IDataBuffer payloadBuffer,
IReadOnlyList<IRtmpSubscribeStreamContext> subscribeStreamContexts)
{
publishStreamContext.UpdateTimestamp(chunkStreamContext.MessageHeader.Timestamp, MediaType.Audio);
publishStreamContext.UpdateTimestamp(chunkStreamContext.Timestamp, MediaType.Audio);

await _mediaMessageBroadcaster.BroadcastMediaMessageAsync(
publishStreamContext,
subscribeStreamContexts,
MediaType.Audio,
chunkStreamContext.MessageHeader.Timestamp,
chunkStreamContext.Timestamp,
isSkippable,
payloadBuffer);
}
Expand Down Expand Up @@ -146,7 +146,7 @@ private async ValueTask HandleAudioPacketCachingAsync(
}
else if (publishStreamContext.GroupOfPicturesCacheActivated)
{
await _mediaMessageCacher.CachePictureAsync(publishStreamContext, MediaType.Audio, payloadBuffer, chunkStreamContext.MessageHeader.Timestamp);
await _mediaMessageCacher.CachePictureAsync(publishStreamContext, MediaType.Audio, payloadBuffer, chunkStreamContext.Timestamp);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,13 @@ private async ValueTask BroadcastVideoMessageToSubscribersAsync(
IDataBuffer payloadBuffer,
IReadOnlyList<IRtmpSubscribeStreamContext> subscribeStreamContexts)
{
publishStreamContext.UpdateTimestamp(chunkStreamContext.MessageHeader.Timestamp, MediaType.Video);
publishStreamContext.UpdateTimestamp(chunkStreamContext.Timestamp, MediaType.Video);

await _mediaMessageBroadcaster.BroadcastMediaMessageAsync(
publishStreamContext,
subscribeStreamContexts,
MediaType.Video,
chunkStreamContext.MessageHeader.Timestamp,
chunkStreamContext.Timestamp,
isSkippable,
payloadBuffer);
}
Expand Down Expand Up @@ -160,7 +160,7 @@ private async ValueTask HandleVideoPacketCachingAsync(
}
else if (publishStreamContext.GroupOfPicturesCacheActivated && avcPacketType == AVCPacketType.NALU)
{
await _mediaMessageCacher.CachePictureAsync(publishStreamContext, MediaType.Video, payloadBuffer, chunkStreamContext.MessageHeader.Timestamp);
await _mediaMessageCacher.CachePictureAsync(publishStreamContext, MediaType.Video, payloadBuffer, chunkStreamContext.Timestamp);
}
}
else if (publishStreamContext.GroupOfPicturesCacheActivated)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ internal interface IRtmpChunkStreamContext
bool IsFirstChunkOfMessage { get; }
IRtmpChunkMessageHeaderContext MessageHeader { get; }
IDataBuffer? PayloadBuffer { get; set; }
uint Timestamp { get; set; }
}

internal interface IRtmpChunkMessageHeaderContext
{
uint Timestamp { get; set; }
uint TimestampDelta { get; set; }
int MessageLength { get; set; }
byte MessageTypeId { get; set; }
uint MessageStreamId { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ internal class RtmpChunkStreamContext : IRtmpChunkStreamContext
public bool IsFirstChunkOfMessage => PayloadBuffer == null;
public IRtmpChunkMessageHeaderContext MessageHeader { get; }
public IDataBuffer? PayloadBuffer { get; set; }
public uint Timestamp { get; set; }

public RtmpChunkStreamContext(uint chunkStreamId)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public async ValueTask<RtmpChunkMessageAggregationResult> AggregateChunkMessages

var chunkStreamContext = contextProvider.GetChunkStreamContext(basicHeader.ChunkStreamId);

var success = basicHeader.ChunkType switch
var messageHeaderSize = basicHeader.ChunkType switch
{
0 => await HandleChunkMessageHeaderType0Async(chunkStreamContext, networkStream, headerBuffer, cancellationToken),
1 => await HandleChunkMessageHeaderType1Async(chunkStreamContext, networkStream, headerBuffer, cancellationToken),
Expand All @@ -37,117 +37,123 @@ public async ValueTask<RtmpChunkMessageAggregationResult> AggregateChunkMessages
_ => throw new ArgumentOutOfRangeException(nameof(basicHeader.ChunkType))
};

if (!success)
return new RtmpChunkMessageAggregationResult(false, 0, chunkStreamContext);

return await HandleChunkMessagePayloadAsync(
chunkStreamContext, networkStream, contextProvider.InChunkSize, headerBuffer.Size, cancellationToken);
chunkStreamContext, networkStream, contextProvider.InChunkSize, basicHeader.Size + messageHeaderSize, cancellationToken);
}
finally
{
_dataBufferPool.Recycle(headerBuffer);
}
}

private async ValueTask<bool> HandleChunkMessageHeaderType0Async(
private async ValueTask<int> HandleChunkMessageHeaderType0Async(
IRtmpChunkStreamContext chunkStreamContext,
INetworkStreamReader networkStream,
IDataBuffer dataBuffer,
CancellationToken cancellationToken)
{
var messageHeader = await RtmpChunkMessageHeaderType0.ReadAsync(dataBuffer, networkStream, cancellationToken);
var messageHeaderSize = messageHeader.Size;

chunkStreamContext.ChunkType = 0;
chunkStreamContext.MessageHeader.MessageLength = messageHeader.MessageLength;
chunkStreamContext.MessageHeader.MessageTypeId = messageHeader.MessageTypeId;
chunkStreamContext.MessageHeader.MessageStreamId = messageHeader.MessageStreamId;

chunkStreamContext.MessageHeader.TimestampDelta = 0;
chunkStreamContext.MessageHeader.MessageTypeId = messageHeader.MessageTypeId;
chunkStreamContext.MessageHeader.MessageLength = messageHeader.MessageLength;
chunkStreamContext.MessageHeader.Timestamp = messageHeader.Timestamp;
chunkStreamContext.MessageHeader.HasExtendedTimestamp = messageHeader.HasExtendedTimestamp();

if (chunkStreamContext.MessageHeader.HasExtendedTimestamp)
{
var extendedTimestampHeader = await RtmpChunkExtendedTimestampHeader.ReadAsync(dataBuffer, networkStream, cancellationToken);
messageHeaderSize += extendedTimestampHeader.Size;

chunkStreamContext.MessageHeader.Timestamp = extendedTimestampHeader.ExtendedTimestamp;
}
else
{
chunkStreamContext.MessageHeader.Timestamp = messageHeader.Timestamp;
}

return true;
chunkStreamContext.Timestamp = chunkStreamContext.MessageHeader.Timestamp;

return messageHeaderSize;
}

private async ValueTask<bool> HandleChunkMessageHeaderType1Async(
private async ValueTask<int> HandleChunkMessageHeaderType1Async(
IRtmpChunkStreamContext chunkStreamContext,
INetworkStreamReader networkStream,
IDataBuffer dataBuffer,
CancellationToken cancellationToken)
{
var messageHeader = await RtmpChunkMessageHeaderType1.ReadAsync(dataBuffer, networkStream, cancellationToken);
var messageHeaderSize = messageHeader.Size;

chunkStreamContext.ChunkType = 1;
chunkStreamContext.MessageHeader.MessageLength = messageHeader.MessageLength;
chunkStreamContext.MessageHeader.MessageTypeId = messageHeader.MessageTypeId;

chunkStreamContext.MessageHeader.MessageLength = messageHeader.MessageLength;
chunkStreamContext.MessageHeader.Timestamp = messageHeader.TimestampDelta;
chunkStreamContext.MessageHeader.HasExtendedTimestamp = messageHeader.HasExtendedTimestamp();

if (chunkStreamContext.MessageHeader.HasExtendedTimestamp)
{
var extendedTimestampHeader = await RtmpChunkExtendedTimestampHeader.ReadAsync(dataBuffer, networkStream, cancellationToken);
chunkStreamContext.MessageHeader.TimestampDelta = extendedTimestampHeader.ExtendedTimestamp;
}
else
{
chunkStreamContext.MessageHeader.TimestampDelta = messageHeader.TimestampDelta;
messageHeaderSize += extendedTimestampHeader.Size;

chunkStreamContext.MessageHeader.Timestamp = extendedTimestampHeader.ExtendedTimestamp;
}
chunkStreamContext.MessageHeader.Timestamp += chunkStreamContext.MessageHeader.TimestampDelta;

return true;
chunkStreamContext.Timestamp += chunkStreamContext.MessageHeader.Timestamp;

return messageHeaderSize;
}

private async ValueTask<bool> HandleChunkMessageHeaderType2Async(
private async ValueTask<int> HandleChunkMessageHeaderType2Async(
IRtmpChunkStreamContext chunkStreamContext,
INetworkStreamReader networkStream,
IDataBuffer dataBuffer,
CancellationToken cancellationToken)
{
var messageHeader = await RtmpChunkMessageHeaderType2.ReadAsync(dataBuffer, networkStream, cancellationToken);
var messageHeaderSize = messageHeader.Size;

chunkStreamContext.ChunkType = 2;

chunkStreamContext.MessageHeader.Timestamp = messageHeader.TimestampDelta;
chunkStreamContext.MessageHeader.HasExtendedTimestamp = messageHeader.HasExtendedTimestamp();

if (chunkStreamContext.MessageHeader.HasExtendedTimestamp)
{
var extendedTimestampHeader = await RtmpChunkExtendedTimestampHeader.ReadAsync(dataBuffer, networkStream, cancellationToken);
chunkStreamContext.MessageHeader.TimestampDelta = extendedTimestampHeader.ExtendedTimestamp;
}
else
{
chunkStreamContext.MessageHeader.TimestampDelta = messageHeader.TimestampDelta;
messageHeaderSize += extendedTimestampHeader.Size;

chunkStreamContext.MessageHeader.Timestamp = extendedTimestampHeader.ExtendedTimestamp;
}
chunkStreamContext.MessageHeader.Timestamp += chunkStreamContext.MessageHeader.TimestampDelta;

return true;
chunkStreamContext.Timestamp += chunkStreamContext.MessageHeader.Timestamp;

return messageHeaderSize;
}

private async ValueTask<bool> HandleChunkMessageHeaderType3Async(
private async ValueTask<int> HandleChunkMessageHeaderType3Async(
IRtmpChunkStreamContext chunkStreamContext,
INetworkStreamReader networkStream,
IDataBuffer dataBuffer,
CancellationToken cancellationToken)
{
int messageHeaderSize = 0;
var timestampDelta = chunkStreamContext.MessageHeader.Timestamp;

chunkStreamContext.ChunkType = 3;

var timestampDelta = chunkStreamContext.MessageHeader.HasExtendedTimestamp ?
(await RtmpChunkExtendedTimestampHeader.ReadAsync(dataBuffer, networkStream, cancellationToken)).ExtendedTimestamp :
chunkStreamContext.MessageHeader.TimestampDelta;
if (chunkStreamContext.MessageHeader.HasExtendedTimestamp)
{
var extendedTimestampHeader = await RtmpChunkExtendedTimestampHeader.ReadAsync(dataBuffer, networkStream, cancellationToken);
messageHeaderSize += extendedTimestampHeader.Size;

timestampDelta = extendedTimestampHeader.ExtendedTimestamp;
}

if (chunkStreamContext.IsFirstChunkOfMessage)
{
chunkStreamContext.MessageHeader.TimestampDelta = timestampDelta;
chunkStreamContext.MessageHeader.Timestamp += timestampDelta;
chunkStreamContext.Timestamp += timestampDelta;
}

return true;
return messageHeaderSize;
}

private async ValueTask<RtmpChunkMessageAggregationResult> HandleChunkMessagePayloadAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public async Task HandleAsync_Should_SendPlayStartAndCaches_If_AuthorizedAndStre
_clientContext.AppName.Returns(appName);

_chunkStreamContext.ChunkStreamId.Returns(chunkStreamId);
_chunkStreamContext.MessageHeader.Timestamp.Returns(timestamp);
_chunkStreamContext.Timestamp.Returns(timestamp);
_chunkStreamContext.MessageHeader.MessageStreamId.Returns(streamId);

_streamContext.StreamId.Returns(streamId);
Expand Down Expand Up @@ -219,7 +219,7 @@ internal async Task HandleAsync_Should_SendError_If_AuthorizedButStreamIsNotSubs
_clientContext.AppName.Returns(appName);

_chunkStreamContext.ChunkStreamId.Returns(chunkStreamId);
_chunkStreamContext.MessageHeader.Timestamp.Returns(timestamp);
_chunkStreamContext.Timestamp.Returns(timestamp);
_chunkStreamContext.MessageHeader.MessageStreamId.Returns(streamId);

_streamContext.StreamId.Returns(streamId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public async Task HandleAsync_Should_SendPlayStartAndCaches_If_AuthorizedAndStre
_clientContext.GetStreamContext(streamId).Returns(_streamContext);
_clientContext.AppName.Returns(appName);
_chunkStreamContext.ChunkStreamId.Returns(chunkStreamId);
_chunkStreamContext.MessageHeader.Timestamp.Returns(timestamp);
_chunkStreamContext.Timestamp.Returns(timestamp);
_chunkStreamContext.MessageHeader.MessageStreamId.Returns(streamId);
_streamContext.StreamId.Returns(streamId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,15 @@ internal async Task HandleAsync_Should_HandleCacheAndBroadcastAndReturnTrue(
.CacheSequenceHeaderAsync(publisher_publishStreamContext, MediaType.Audio, _dataBuffer);

_ = _mediaMessageCacher.Received(gopCacheActivated && isPictureCachable ? 1 : 0)
.CachePictureAsync(publisher_publishStreamContext, MediaType.Audio, _dataBuffer, _chunkStreamContext.MessageHeader.Timestamp);
.CachePictureAsync(publisher_publishStreamContext, MediaType.Audio, _dataBuffer, _chunkStreamContext.Timestamp);

publisher_publishStreamContext.Received(1).UpdateTimestamp(_chunkStreamContext.MessageHeader.Timestamp, MediaType.Audio);
publisher_publishStreamContext.Received(1).UpdateTimestamp(_chunkStreamContext.Timestamp, MediaType.Audio);

await _mediaMessageBroadcaster.Received(1).BroadcastMediaMessageAsync(
publisher_publishStreamContext,
subscriber_subscribeStreamContexts,
MediaType.Audio,
_chunkStreamContext.MessageHeader.Timestamp,
_chunkStreamContext.Timestamp,
isSkippable,
_dataBuffer
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,15 @@ avcPacketType is AVCPacketType.SequenceHeader &&
.CacheSequenceHeaderAsync(publisher_publishStreamContext, MediaType.Video, _dataBuffer);

_ = _mediaMessageCacher.Received(gopCacheActivated && isPictureCachable ? 1 : 0)
.CachePictureAsync(publisher_publishStreamContext, MediaType.Video, _dataBuffer, _chunkStreamContext.MessageHeader.Timestamp);
.CachePictureAsync(publisher_publishStreamContext, MediaType.Video, _dataBuffer, _chunkStreamContext.Timestamp);

publisher_publishStreamContext.Received(1).UpdateTimestamp(_chunkStreamContext.MessageHeader.Timestamp, MediaType.Video);
publisher_publishStreamContext.Received(1).UpdateTimestamp(_chunkStreamContext.Timestamp, MediaType.Video);

await _mediaMessageBroadcaster.Received(1).BroadcastMediaMessageAsync(
publisher_publishStreamContext,
subscriber_subscribeStreamContexts,
MediaType.Video,
_chunkStreamContext.MessageHeader.Timestamp,
_chunkStreamContext.Timestamp,
isSkippable,
_dataBuffer
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,15 @@ internal async Task Handle_Should_HandleChunksCorrectly<TRtmpChunkMessageHeader>
else if (messageHeader is RtmpChunkMessageHeaderType1 headerType1)
{
streamContext.ChunkStreamId.Should().Be(basicHeader.ChunkStreamId);
streamContext.MessageHeader.TimestampDelta.Should().Be(headerType1.TimestampDelta);
streamContext.MessageHeader.Timestamp.Should().Be(headerType1.TimestampDelta);
streamContext.MessageHeader.MessageLength.Should().Be(headerType1.MessageLength);
streamContext.MessageHeader.MessageTypeId.Should().Be(headerType1.MessageTypeId);
streamContext.MessageHeader.HasExtendedTimestamp.Should().Be(headerType1.HasExtendedTimestamp());
}
else if (messageHeader is RtmpChunkMessageHeaderType2 headerType2)
{
streamContext.ChunkStreamId.Should().Be(basicHeader.ChunkStreamId);
streamContext.MessageHeader.TimestampDelta.Should().Be(headerType2.TimestampDelta);
streamContext.MessageHeader.Timestamp.Should().Be(headerType2.TimestampDelta);
streamContext.MessageHeader.HasExtendedTimestamp.Should().Be(headerType2.HasExtendedTimestamp());
}

Expand Down

0 comments on commit 72f140a

Please sign in to comment.