From 8c47074ae57f06e4b7d2cd1cecc3342395da5507 Mon Sep 17 00:00:00 2001 From: Joseph Tam Date: Sun, 6 Oct 2024 16:27:53 +0800 Subject: [PATCH] refactor: data buffer --- .../FlvWriter.cs | 2 +- .../Internal/FlvWriter.cs | 4 +- .../Media/RtmpVideoMessageHandler.cs | 3 +- .../Internal/RtmpStreamContext.cs | 2 +- .../Internal/Containers/CRC32.cs | 6 +- .../Internal/Containers/TsMuxer.Models.cs | 11 +- .../Internal/Containers/TsMuxer.cs | 6 +- .../Buffers/Contracts/IDataBuffer.cs | 13 +- .../Buffers/DataBuffer.Read.cs | 47 +++--- .../Buffers/DataBuffer.Write.cs | 28 ++-- .../Buffers/DataBuffer.cs | 134 ++++++++++++++++-- .../Buffers/DataBufferPool.cs | 1 - .../DataBuffer.Read.Test.cs | 2 +- .../DataBuffer.Test.cs | 51 ++++++- .../DataBuffer.Write.Test.cs | 42 +++--- .../FlvWriter.Test.cs | 2 +- .../Services/FlvMediaTagSenderService.Test.cs | 2 +- .../RtmpCommandMessageSenderService.Test.cs | 6 +- .../RtmpMediaMessageCacherService.Test.cs | 14 +- .../RtmpProtocolControlService.Test.cs | 15 +- ...tmpUserControlMessageSenderService.Test.cs | 12 +- .../RtmpChunkMessageAggregatorService.Test.cs | 4 +- .../RtmpChunkMessageWriterService.Test.cs | 6 +- 23 files changed, 286 insertions(+), 127 deletions(-) diff --git a/samples/LiveStreamingServerNet.RtmpClientPlayDemo/FlvWriter.cs b/samples/LiveStreamingServerNet.RtmpClientPlayDemo/FlvWriter.cs index ef1af559..0144d7ca 100644 --- a/samples/LiveStreamingServerNet.RtmpClientPlayDemo/FlvWriter.cs +++ b/samples/LiveStreamingServerNet.RtmpClientPlayDemo/FlvWriter.cs @@ -38,7 +38,7 @@ public void WriteTag(FlvTagType tagType, uint timestamp, ReadOnlySpan payl dataBuffer.Write(payload); dataBuffer.WriteUInt32BigEndian(packageSize); - _streamWriter.Write(dataBuffer.UnderlyingBuffer.AsSpan(0, dataBuffer.Size)); + _streamWriter.Write(dataBuffer.AsSpan()); } public void Dispose() diff --git a/src/LiveStreamingServerNet.Flv/Internal/FlvWriter.cs b/src/LiveStreamingServerNet.Flv/Internal/FlvWriter.cs index 06b840c8..3f6673cd 100644 --- a/src/LiveStreamingServerNet.Flv/Internal/FlvWriter.cs +++ b/src/LiveStreamingServerNet.Flv/Internal/FlvWriter.cs @@ -66,9 +66,7 @@ public async ValueTask WriteTagAsync(FlvTagType tagType, uint timestamp, Action< var header = new FlvTagHeader(tagType, payloadSize, timestamp); header.Write(dataBuffer.MoveTo(0)); - await _streamWriter.WriteAsync( - new ArraySegment(dataBuffer.UnderlyingBuffer, 0, dataBuffer.Size), - cancellationToken); + await _streamWriter.WriteAsync(dataBuffer.AsMemory(), cancellationToken); } finally { diff --git a/src/LiveStreamingServerNet.Rtmp.Server/Internal/RtmpEventHandlers/Media/RtmpVideoMessageHandler.cs b/src/LiveStreamingServerNet.Rtmp.Server/Internal/RtmpEventHandlers/Media/RtmpVideoMessageHandler.cs index 9e643ab7..a4692e44 100644 --- a/src/LiveStreamingServerNet.Rtmp.Server/Internal/RtmpEventHandlers/Media/RtmpVideoMessageHandler.cs +++ b/src/LiveStreamingServerNet.Rtmp.Server/Internal/RtmpEventHandlers/Media/RtmpVideoMessageHandler.cs @@ -172,8 +172,7 @@ private static void ProcessExVideoHeader( [MethodImpl(MethodImplOptions.AggressiveInlining)] static void RemovePadding(IDataBuffer payloadBuffer, VideoPacketType packetType, int size) { - payloadBuffer.UnderlyingBuffer.AsSpan(size).CopyTo(payloadBuffer.UnderlyingBuffer.AsSpan()); - payloadBuffer.Size -= size; + payloadBuffer.TrimStart(size); } [MethodImpl(MethodImplOptions.AggressiveInlining)] diff --git a/src/LiveStreamingServerNet.Rtmp.Server/Internal/RtmpStreamContext.cs b/src/LiveStreamingServerNet.Rtmp.Server/Internal/RtmpStreamContext.cs index c41cc440..49d5022f 100644 --- a/src/LiveStreamingServerNet.Rtmp.Server/Internal/RtmpStreamContext.cs +++ b/src/LiveStreamingServerNet.Rtmp.Server/Internal/RtmpStreamContext.cs @@ -170,7 +170,7 @@ public GroupOfPicturesCache(IBufferPool? bufferPool) public void Add(PictureCacheInfo info, IDataBuffer buffer) { - _cache.Write(info, buffer.UnderlyingBuffer.AsSpan(0, buffer.Size)); + _cache.Write(info, buffer.AsSpan()); } public void Clear() diff --git a/src/LiveStreamingServerNet.StreamProcessor/Internal/Containers/CRC32.cs b/src/LiveStreamingServerNet.StreamProcessor/Internal/Containers/CRC32.cs index 84389069..d902ccae 100644 --- a/src/LiveStreamingServerNet.StreamProcessor/Internal/Containers/CRC32.cs +++ b/src/LiveStreamingServerNet.StreamProcessor/Internal/Containers/CRC32.cs @@ -69,8 +69,10 @@ internal static unsafe class CRC32 0xbcb4666d, 0xb8757bda, 0xb5365d03, 0xb1f740b4 }; - public static uint Generate(byte[] data, int offset, int length) + public static uint Generate(Span data) { + var dataLength = data.Length; + unchecked { fixed (byte* dataPtr = data) @@ -79,7 +81,7 @@ public static uint Generate(byte[] data, int offset, int length) { uint crc = 0xffffffff; - for (int pos = offset; pos < offset + length; pos++) + for (int pos = 0; pos < dataLength; pos++) { var index = ((crc >> 24) ^ *(dataPtr + pos)) & 0xff; crc = (crc << 8) ^ *(tablePtr + index); diff --git a/src/LiveStreamingServerNet.StreamProcessor/Internal/Containers/TsMuxer.Models.cs b/src/LiveStreamingServerNet.StreamProcessor/Internal/Containers/TsMuxer.Models.cs index 7547f680..f3a4e083 100644 --- a/src/LiveStreamingServerNet.StreamProcessor/Internal/Containers/TsMuxer.Models.cs +++ b/src/LiveStreamingServerNet.StreamProcessor/Internal/Containers/TsMuxer.Models.cs @@ -296,13 +296,20 @@ private void WriteTable(IDataBuffer dataBuffer) } } - private record struct TableChecksum(byte[] Buffer, int Start, int Length) + private ref struct TableChecksum { public const int Size = 4; + private readonly Span _buffer; + + public TableChecksum(Span buffer) + { + _buffer = buffer; + } + public void Write(IDataBuffer dataBuffer) { - var checksum = CRC32.Generate(Buffer, Start, Length); + var checksum = CRC32.Generate(_buffer); dataBuffer.WriteUInt32BigEndian(checksum); } } diff --git a/src/LiveStreamingServerNet.StreamProcessor/Internal/Containers/TsMuxer.cs b/src/LiveStreamingServerNet.StreamProcessor/Internal/Containers/TsMuxer.cs index be88b547..882c05b1 100644 --- a/src/LiveStreamingServerNet.StreamProcessor/Internal/Containers/TsMuxer.cs +++ b/src/LiveStreamingServerNet.StreamProcessor/Internal/Containers/TsMuxer.cs @@ -126,7 +126,7 @@ private void WritePSIPacket(IDataBuffer tsBuffer, ushort packetID, byte tableID, var checksumStart = psiStartPosition + psiHeader.ChecksumOffset; var checksumLength = tsBuffer.Position - checksumStart; - var checksum = new TableChecksum(tsBuffer.UnderlyingBuffer, checksumStart, checksumLength); + var checksum = new TableChecksum(tsBuffer.AsSpan(checksumStart, checksumLength)); checksum.Write(tsBuffer); var remainingSize = TsConstants.TsPacketSize - (tsBuffer.Position - startPosition); @@ -347,11 +347,11 @@ private async ValueTask FlushBuffersAsync(string path) if (_flushedCount == 0) { - await fileStream.WriteAsync(_headerBuffer.UnderlyingBuffer.AsMemory(0, _headerBuffer.Size)); + await fileStream.WriteAsync(_headerBuffer.AsMemory(0, _headerBuffer.Size)); _headerBuffer.Reset(); } - await fileStream.WriteAsync(_payloadBuffer.UnderlyingBuffer.AsMemory(0, _payloadBuffer.Size)); + await fileStream.WriteAsync(_payloadBuffer.AsMemory(0, _payloadBuffer.Size)); _payloadBuffer.Reset(); _flushedCount++; diff --git a/src/LiveStreamingServerNet.Utilities/Buffers/Contracts/IDataBuffer.cs b/src/LiveStreamingServerNet.Utilities/Buffers/Contracts/IDataBuffer.cs index a60b00f3..bd970d85 100644 --- a/src/LiveStreamingServerNet.Utilities/Buffers/Contracts/IDataBuffer.cs +++ b/src/LiveStreamingServerNet.Utilities/Buffers/Contracts/IDataBuffer.cs @@ -5,8 +5,17 @@ public interface IDataBuffer : IDisposable int Position { get; set; } int Size { get; set; } - byte[] UnderlyingBuffer { get; } + Span AsSpan(); + Span AsSpan(int offset); + Span AsSpan(int offset, int length); + Memory AsMemory(); + Memory AsMemory(int offset); + Memory AsMemory(int offset, int length); + ArraySegment AsSegment(); + ArraySegment AsSegment(int offset); + ArraySegment AsSegment(int offset, int length); + void TrimStart(int count); void Advance(int count); IDataBuffer MoveTo(int position); void Reset(); @@ -39,7 +48,7 @@ public interface IDataBuffer : IDisposable ushort ReadUInt16BigEndian(); uint ReadUInt24BigEndian(); uint ReadUInt32BigEndian(); - short ReadInt16BiEndian(); + short ReadInt16BigEndian(); int ReadInt24BigEndian(); int ReadInt32BigEndian(); diff --git a/src/LiveStreamingServerNet.Utilities/Buffers/DataBuffer.Read.cs b/src/LiveStreamingServerNet.Utilities/Buffers/DataBuffer.Read.cs index 3a8debfb..757466c0 100644 --- a/src/LiveStreamingServerNet.Utilities/Buffers/DataBuffer.Read.cs +++ b/src/LiveStreamingServerNet.Utilities/Buffers/DataBuffer.Read.cs @@ -8,7 +8,7 @@ public partial class DataBuffer public void ReadBytes(byte[] buffer, int index, int count) { var targetSpan = buffer.AsSpan(index, count); - _buffer.AsSpan(_position, count).CopyTo(targetSpan); + _buffer.AsSpan(_startIndex + _position, count).CopyTo(targetSpan); Advance(count); } @@ -21,108 +21,115 @@ public byte[] ReadBytes(int count) public byte ReadByte() { - return _buffer[_position++]; + return _buffer[_startIndex + _position++]; } public short ReadInt16() { - var value = Unsafe.As(ref _buffer[_position]); + var value = Unsafe.As(ref _buffer[_startIndex + _position]); _position += 2; return value; } public int ReadInt32() { - var value = Unsafe.As(ref _buffer[_position]); + var value = Unsafe.As(ref _buffer[_startIndex + _position]); _position += 4; return value; } public long ReadInt64() { - var value = Unsafe.As(ref _buffer[_position]); + var value = Unsafe.As(ref _buffer[_startIndex + _position]); _position += 8; return value; } public ushort ReadUInt16() { - var value = Unsafe.As(ref _buffer[_position]); + var value = Unsafe.As(ref _buffer[_startIndex + _position]); _position += 2; return value; } public uint ReadUInt32() { - var value = Unsafe.As(ref _buffer[_position]); + var value = Unsafe.As(ref _buffer[_startIndex + _position]); _position += 4; return value; } public ulong ReadUInt64() { - var value = Unsafe.As(ref _buffer[_position]); + var value = Unsafe.As(ref _buffer[_startIndex + _position]); _position += 8; return value; } public float ReadSingle() { - var value = Unsafe.As(ref _buffer[_position]); + var value = Unsafe.As(ref _buffer[_startIndex + _position]); _position += 4; return value; } public double ReadDouble() { - var value = Unsafe.As(ref _buffer[_position]); + var value = Unsafe.As(ref _buffer[_startIndex + _position]); _position += 8; return value; } public bool ReadBoolean() { - return Unsafe.As(ref _buffer[_position++]); + return Unsafe.As(ref _buffer[_startIndex + _position++]); } public char ReadChar() { - var value = Unsafe.As(ref _buffer[_position]); + var value = Unsafe.As(ref _buffer[_startIndex + _position]); _position += 2; return value; } public ushort ReadUInt16BigEndian() { - var value = BinaryPrimitives.ReverseEndianness(Unsafe.As(ref _buffer[_position])); + var value = BinaryPrimitives.ReverseEndianness(Unsafe.As(ref _buffer[_startIndex + _position])); _position += 2; return value; } public uint ReadUInt24BigEndian() { - var value = (uint)(_buffer[_position] << 16 | _buffer[_position + 1] << 8 | _buffer[_position + 2]); + var value = + _buffer[_startIndex + _position] << 16 | + _buffer[_startIndex + _position + 1] << 8 | + _buffer[_startIndex + _position + 2]; + _position += 3; - return value; + return (uint)value; } public uint ReadUInt32BigEndian() { - var value = BinaryPrimitives.ReverseEndianness(Unsafe.As(ref _buffer[_position])); + var value = BinaryPrimitives.ReverseEndianness(Unsafe.As(ref _buffer[_startIndex + _position])); _position += 4; return value; } - public short ReadInt16BiEndian() + public short ReadInt16BigEndian() { - var value = BinaryPrimitives.ReverseEndianness(Unsafe.As(ref _buffer[_position])); + var value = BinaryPrimitives.ReverseEndianness(Unsafe.As(ref _buffer[_startIndex + _position])); _position += 2; return value; } public int ReadInt24BigEndian() { - var value = _buffer[_position] << 16 | _buffer[_position + 1] << 8 | _buffer[_position + 2]; + var value = + _buffer[_startIndex + _position] << 16 | + _buffer[_startIndex + _position + 1] << 8 | + _buffer[_startIndex + _position + 2]; if ((value & 0x800000) != 0) value |= unchecked((int)0xff000000); @@ -133,7 +140,7 @@ public int ReadInt24BigEndian() public int ReadInt32BigEndian() { - var value = BinaryPrimitives.ReverseEndianness(Unsafe.As(ref _buffer[_position])); + var value = BinaryPrimitives.ReverseEndianness(Unsafe.As(ref _buffer[_startIndex + _position])); _position += 4; return value; } diff --git a/src/LiveStreamingServerNet.Utilities/Buffers/DataBuffer.Write.cs b/src/LiveStreamingServerNet.Utilities/Buffers/DataBuffer.Write.cs index 1098534d..87f130e4 100644 --- a/src/LiveStreamingServerNet.Utilities/Buffers/DataBuffer.Write.cs +++ b/src/LiveStreamingServerNet.Utilities/Buffers/DataBuffer.Write.cs @@ -13,7 +13,7 @@ public void WriteRandomBytes(int count) Advance(count); for (int i = 0; i < count; i++) - _buffer[pos + i] = (byte)RandomNumberGenerator.GetInt32(0, 255); + _buffer[_startIndex + pos + i] = (byte)RandomNumberGenerator.GetInt32(0, 255); } public void Write(Memory memory) @@ -21,7 +21,7 @@ public void Write(Memory memory) var pos = _position; Advance(memory.Length); - memory.Span.CopyTo(_buffer.AsSpan(pos)); + memory.Span.CopyTo(_buffer.AsSpan(_startIndex + pos)); } public void Write(byte[] buffer) @@ -29,7 +29,7 @@ public void Write(byte[] buffer) var pos = _position; Advance(buffer.Length); - buffer.AsSpan().CopyTo(_buffer.AsSpan(pos)); + buffer.AsSpan().CopyTo(_buffer.AsSpan(_startIndex + pos)); } public void Write(byte[] buffer, int offset, int count) @@ -37,7 +37,7 @@ public void Write(byte[] buffer, int offset, int count) var pos = _position; Advance(count); - buffer.AsSpan(offset, count).CopyTo(_buffer.AsSpan(pos)); + buffer.AsSpan(offset, count).CopyTo(_buffer.AsSpan(_startIndex + pos)); } public void Write(ReadOnlySpan buffer) @@ -45,7 +45,7 @@ public void Write(ReadOnlySpan buffer) var pos = _position; Advance(buffer.Length); - buffer.CopyTo(_buffer.AsSpan(pos)); + buffer.CopyTo(_buffer.AsSpan(_startIndex + pos)); } public void Write(byte value) @@ -53,7 +53,7 @@ public void Write(byte value) var pos = _position; Advance(1); - _buffer[pos] = value; + _buffer[_startIndex + pos] = value; } private void WriteUnaligned(T value, int size) @@ -61,7 +61,7 @@ private void WriteUnaligned(T value, int size) var pos = _position; Advance(size); - var targetSpan = _buffer.AsSpan(pos); + var targetSpan = _buffer.AsSpan(_startIndex + pos); Unsafe.WriteUnaligned(ref MemoryMarshal.GetReference(targetSpan), value); } @@ -125,10 +125,9 @@ public void WriteUInt24BigEndian(uint value) var pos = _position; Advance(3); - _position = pos; - Write((byte)(value >> 16)); - Write((byte)(value >> 8)); - Write((byte)(value)); + _buffer[_startIndex + pos] = (byte)(value >> 16); + _buffer[_startIndex + pos + 1] = (byte)(value >> 8); + _buffer[_startIndex + pos + 2] = (byte)value; } public void WriteUInt32BigEndian(uint value) @@ -146,10 +145,9 @@ public void WriteInt24BigEndian(int value) var pos = _position; Advance(3); - _position = pos; - Write((byte)(value >> 16)); - Write((byte)(value >> 8)); - Write((byte)(value)); + _buffer[_startIndex + pos] = (byte)(value >> 16); + _buffer[_startIndex + pos + 1] = (byte)(value >> 8); + _buffer[_startIndex + pos + 2] = (byte)value; } public void WriteInt32BigEndian(int value) diff --git a/src/LiveStreamingServerNet.Utilities/Buffers/DataBuffer.cs b/src/LiveStreamingServerNet.Utilities/Buffers/DataBuffer.cs index 29a95dc1..5e7e1de9 100644 --- a/src/LiveStreamingServerNet.Utilities/Buffers/DataBuffer.cs +++ b/src/LiveStreamingServerNet.Utilities/Buffers/DataBuffer.cs @@ -1,6 +1,5 @@ using LiveStreamingServerNet.Utilities.Buffers.Contracts; using System.Buffers; -using System.Diagnostics; using System.Runtime.CompilerServices; namespace LiveStreamingServerNet.Utilities.Buffers @@ -8,9 +7,12 @@ namespace LiveStreamingServerNet.Utilities.Buffers public partial class DataBuffer : IDataBuffer { private readonly IBufferPool? _bufferPool; + private byte[] _buffer; + private int _startIndex; private int _position; private int _size; + private bool _isDisposed; public int Position @@ -35,8 +37,6 @@ public int Size public int Capacity => _buffer.Length; - public byte[] UnderlyingBuffer => _buffer; - public DataBuffer() : this(1024) { } public DataBuffer(int initialCapacity) : this(null, initialCapacity) { } @@ -50,27 +50,54 @@ public DataBuffer(IBufferPool? bufferPool, int initialCapacity) [MethodImpl(MethodImplOptions.AggressiveInlining)] private void EnsureCapacity(int capacity) { + if ((_startIndex + capacity) < Capacity) + return; + if (capacity < Capacity) + { + _buffer.AsSpan(_startIndex, _size).CopyTo(_buffer); + _startIndex = 0; return; + } byte[] buffer; if (_bufferPool != null) { buffer = _bufferPool.Rent(capacity); - _buffer.AsSpan().CopyTo(buffer); + _buffer.AsSpan(_startIndex, _size).CopyTo(buffer); _bufferPool.Return(_buffer); } else { buffer = ArrayPool.Shared.Rent(capacity); - _buffer.AsSpan().CopyTo(buffer); + _buffer.AsSpan(_startIndex, _size).CopyTo(buffer); ArrayPool.Shared.Return(_buffer); } + _startIndex = 0; _buffer = buffer; } + public void TrimStart(int count) + { + if (count < 0) + throw new ArgumentOutOfRangeException(nameof(count)); + + if (count == 0) + return; + + if (count >= _size) + { + Reset(); + return; + } + + _startIndex += count; + _size -= count; + _position -= count; + } + public IDataBuffer MoveTo(int position) { Position = position; @@ -85,31 +112,32 @@ public void Advance(int count) public void Reset() { + _startIndex = 0; _position = 0; _size = 0; } public async Task FlushAsync(Stream output) { - await output.WriteAsync(_buffer, 0, Size); + await output.WriteAsync(_buffer, _startIndex, _size); Reset(); } public void Flush(IDataBuffer output) { - output.Write(_buffer, 0, Size); + output.Write(_buffer, _startIndex, _size); Reset(); } public void Flush(Stream output) { - output.Write(_buffer, 0, Size); + output.Write(_buffer.AsSpan(_startIndex, _size)); Reset(); } public void CopyAllTo(IDataBuffer targetBuffer) { - targetBuffer.Write(_buffer, 0, Size); + targetBuffer.Write(_buffer, _startIndex, _size); } public void ReadAndWriteTo(IDataBuffer targetBuffer, int bytesCount) @@ -117,12 +145,13 @@ public void ReadAndWriteTo(IDataBuffer targetBuffer, int bytesCount) if (_position + bytesCount > _size) throw new ArgumentOutOfRangeException(nameof(bytesCount)); - targetBuffer.Write(_buffer, _position, bytesCount); + targetBuffer.Write(_buffer, _startIndex + _position, bytesCount); _position += bytesCount; } public ValueTask FromStreamData(Stream stream, int bytesCount, CancellationToken cancellationToken = default) { + _startIndex = 0; _position = 0; Size = bytesCount; return stream.ReadExactlyAsync(_buffer, 0, bytesCount, cancellationToken); @@ -132,11 +161,12 @@ public ValueTask AppendStreamData(Stream stream, int bytesCount, CancellationTok { var pos = _position; Advance(bytesCount); - return stream.ReadExactlyAsync(_buffer, pos, bytesCount, cancellationToken); + return stream.ReadExactlyAsync(_buffer, _startIndex + pos, bytesCount, cancellationToken); } public ValueTask FromStreamData(IStreamReader streamReader, int bytesCount, CancellationToken cancellationToken = default) { + _startIndex = 0; _position = 0; Size = bytesCount; return streamReader.ReadExactlyAsync(_buffer, 0, bytesCount, cancellationToken); @@ -146,16 +176,22 @@ public ValueTask AppendStreamData(IStreamReader streamReader, int bytesCount, Ca { var pos = _position; Advance(bytesCount); - return streamReader.ReadExactlyAsync(_buffer, pos, bytesCount, cancellationToken); + return streamReader.ReadExactlyAsync(_buffer, _startIndex + pos, bytesCount, cancellationToken); } public IRentedBuffer ToRentedBuffer(int offset, int size, int initialClaim = 1) { + if (offset < 0) + throw new ArgumentOutOfRangeException(nameof(offset)); + + if (size < 0) + throw new ArgumentOutOfRangeException(nameof(size)); + if (offset + size > _size) throw new ArgumentOutOfRangeException(nameof(size)); var rentedBuffer = new RentedBuffer(_bufferPool, size, initialClaim); - _buffer.AsSpan(offset, size).CopyTo(rentedBuffer.Buffer); + _buffer.AsSpan(_startIndex + offset, size).CopyTo(rentedBuffer.Buffer); return rentedBuffer; } @@ -165,6 +201,78 @@ public IRentedBuffer ToRentedBuffer(int initialClaim = 1) return ToRentedBuffer(0, _size, initialClaim); } + public Span AsSpan() + { + return _buffer.AsSpan(_startIndex, _size); + } + + public Span AsSpan(int offset) + { + return _buffer.AsSpan(_startIndex + offset, _size); + } + + public Span AsSpan(int offset, int length) + { + if (offset < 0) + throw new ArgumentOutOfRangeException(nameof(offset)); + + if (length < 0) + throw new ArgumentOutOfRangeException(nameof(length)); + + if (offset + length > _size) + throw new ArgumentOutOfRangeException(nameof(length)); + + return _buffer.AsSpan(_startIndex + offset, length); + } + + public Memory AsMemory() + { + return _buffer.AsMemory(_startIndex, _size); + } + + public Memory AsMemory(int offset) + { + return _buffer.AsMemory(_startIndex + offset, _size); + } + + public Memory AsMemory(int offset, int length) + { + if (offset < 0) + throw new ArgumentOutOfRangeException(nameof(offset)); + + if (length < 0) + throw new ArgumentOutOfRangeException(nameof(length)); + + if (offset + length > _size) + throw new ArgumentOutOfRangeException(nameof(length)); + + return _buffer.AsMemory(_startIndex + offset, length); + } + + public ArraySegment AsSegment() + { + return new ArraySegment(_buffer, _startIndex, _size); + } + + public ArraySegment AsSegment(int offset) + { + return new ArraySegment(_buffer, _startIndex + offset, _size); + } + + public ArraySegment AsSegment(int offset, int length) + { + if (offset < 0) + throw new ArgumentOutOfRangeException(nameof(offset)); + + if (length < 0) + throw new ArgumentOutOfRangeException(nameof(length)); + + if (offset + length > _size) + throw new ArgumentOutOfRangeException(nameof(length)); + + return new ArraySegment(_buffer, _startIndex + offset, length); + } + public virtual void Dispose() { if (_isDisposed) diff --git a/src/LiveStreamingServerNet.Utilities/Buffers/DataBufferPool.cs b/src/LiveStreamingServerNet.Utilities/Buffers/DataBufferPool.cs index 2477cce8..d52a3ad5 100644 --- a/src/LiveStreamingServerNet.Utilities/Buffers/DataBufferPool.cs +++ b/src/LiveStreamingServerNet.Utilities/Buffers/DataBufferPool.cs @@ -43,7 +43,6 @@ public IDataBuffer Obtain() public void Recycle(IDataBuffer dataBuffer) { - Debug.Assert(dataBuffer.UnderlyingBuffer != null); _pool.Recycle(dataBuffer); } } diff --git a/test/LIveStreamingServerNet.Utilities.Test/DataBuffer.Read.Test.cs b/test/LIveStreamingServerNet.Utilities.Test/DataBuffer.Read.Test.cs index d1a9be3c..608dd19d 100644 --- a/test/LIveStreamingServerNet.Utilities.Test/DataBuffer.Read.Test.cs +++ b/test/LIveStreamingServerNet.Utilities.Test/DataBuffer.Read.Test.cs @@ -359,7 +359,7 @@ public void ReadInt16BiEndian() buffer.MoveTo(startPos).Write(expectedBytes); // Act - var result = buffer.MoveTo(startPos).ReadInt16BiEndian(); + var result = buffer.MoveTo(startPos).ReadInt16BigEndian(); // Assert var endPos = buffer.Position; diff --git a/test/LIveStreamingServerNet.Utilities.Test/DataBuffer.Test.cs b/test/LIveStreamingServerNet.Utilities.Test/DataBuffer.Test.cs index 6e5b567f..0718530e 100644 --- a/test/LIveStreamingServerNet.Utilities.Test/DataBuffer.Test.cs +++ b/test/LIveStreamingServerNet.Utilities.Test/DataBuffer.Test.cs @@ -93,7 +93,7 @@ public void FlushToDataBuffer() // Assert srcDataBuffer.Position.Should().Be(0); - var result = dstDataBuffer.UnderlyingBuffer.Take(expected.Length); + var result = dstDataBuffer.AsSpan(0, expected.Length).ToArray(); result.Should().BeEquivalentTo(expected); } @@ -157,7 +157,7 @@ public void CopyAllTo() // Assert srcDataBuffer.Position.Should().Be(expectedPos); - var result = dstDataBuffer.UnderlyingBuffer.Take(expected.Length); + var result = dstDataBuffer.AsSpan(0, expected.Length).ToArray(); result.Should().BeEquivalentTo(expected); } @@ -181,7 +181,7 @@ public void ReadAndWriteTo() // Assert srcDataBuffer.Position.Should().Be(expectedEndPos); - var result = dstDataBuffer.UnderlyingBuffer.Take(expected.Length); + var result = dstDataBuffer.AsSpan(0, expected.Length).ToArray(); result.Should().BeEquivalentTo(expected); } @@ -202,7 +202,7 @@ public async Task FromStreamData() var size = dataBuffer.Size; size.Should().Be(expected.Length); - var result = dataBuffer.UnderlyingBuffer.Take(expected.Length); + var result = dataBuffer.AsSpan(0, expected.Length).ToArray(); result.Should().BeEquivalentTo(expected); } @@ -223,8 +223,49 @@ public async Task AppendStreamData() var size = dataBuffer.Size; size.Should().Be(startPos + expected.Length); - var result = dataBuffer.UnderlyingBuffer.Skip(startPos).Take(expected.Length); + var result = dataBuffer.AsSpan(startPos, expected.Length).ToArray(); result.Should().BeEquivalentTo(expected); } + + [Theory] + [InlineData(10, 0)] + [InlineData(10, 5)] + [InlineData(10, 8)] + public void TrimStart(int bufferSize, int trimCount) + { + // Arrange + var buffer = _fixture.CreateMany(bufferSize).ToArray(); + + using var dataBuffer = new DataBuffer(); + dataBuffer.Write(buffer); + + // Act + dataBuffer.TrimStart(trimCount); + + // Assert + dataBuffer.AsSpan().ToArray().Should().BeEquivalentTo(buffer.AsSpan(trimCount).ToArray()); + } + + [Theory] + [InlineData(8, 32, 0, 8)] + [InlineData(8, 32, 16, 8)] + [InlineData(8, 32, 16, 16)] + [InlineData(8, 32, 16, 32)] + public void TrimStartAndWrite(int initialCapacity, int bufferSize, int trimCount, int secondBufferSize) + { + // Arrange + var buffer = _fixture.CreateMany(bufferSize).ToArray(); + var secondBuffer = _fixture.CreateMany(secondBufferSize).ToArray(); + + using var dataBuffer = new DataBuffer(initialCapacity); + dataBuffer.Write(buffer); + + // Act + dataBuffer.TrimStart(trimCount); + dataBuffer.Write(secondBuffer); + + // Assert + dataBuffer.AsSpan().ToArray().Should().BeEquivalentTo(buffer.AsSpan(trimCount).ToArray().Concat(secondBuffer)); + } } } diff --git a/test/LIveStreamingServerNet.Utilities.Test/DataBuffer.Write.Test.cs b/test/LIveStreamingServerNet.Utilities.Test/DataBuffer.Write.Test.cs index 7151390e..aa7c4e05 100644 --- a/test/LIveStreamingServerNet.Utilities.Test/DataBuffer.Write.Test.cs +++ b/test/LIveStreamingServerNet.Utilities.Test/DataBuffer.Write.Test.cs @@ -29,7 +29,7 @@ public void WriteByte() var endPos = dataBuffer.Position; endPos.Should().Be(startPos + 1); - var result = dataBuffer.UnderlyingBuffer[startPos]; + var result = dataBuffer.AsSpan()[startPos]; result.Should().Be(expected); } @@ -49,7 +49,7 @@ public void WriteInt16() var endPos = dataBuffer.Position; endPos.Should().Be(startPos + 2); - var result = BitConverter.ToInt16(dataBuffer.UnderlyingBuffer, startPos); + var result = BitConverter.ToInt16(dataBuffer.AsSpan(startPos)); result.Should().Be(expected); } @@ -69,7 +69,7 @@ public void WriteInt32() // Assert endPos.Should().Be(startPos + 4); - var result = BitConverter.ToInt32(dataBuffer.UnderlyingBuffer, startPos); + var result = BitConverter.ToInt32(dataBuffer.AsSpan(startPos)); result.Should().Be(expected); } @@ -89,7 +89,7 @@ public void WriteInt64() var endPos = dataBuffer.Position; endPos.Should().Be(startPos + 8); - var result = BitConverter.ToInt64(dataBuffer.UnderlyingBuffer, startPos); + var result = BitConverter.ToInt64(dataBuffer.AsSpan(startPos)); result.Should().Be(expected); } @@ -109,7 +109,7 @@ public void WriteUInt16() var endPos = dataBuffer.Position; endPos.Should().Be(startPos + 2); - var result = BitConverter.ToUInt16(dataBuffer.UnderlyingBuffer, startPos); + var result = BitConverter.ToUInt16(dataBuffer.AsSpan(startPos)); result.Should().Be(expected); } @@ -129,7 +129,7 @@ public void WriteUInt32() var endPos = dataBuffer.Position; endPos.Should().Be(startPos + 4); - var result = BitConverter.ToUInt32(dataBuffer.UnderlyingBuffer, startPos); + var result = BitConverter.ToUInt32(dataBuffer.AsSpan(startPos)); result.Should().Be(expected); } @@ -149,7 +149,7 @@ public void WriteUInt64() var endPos = dataBuffer.Position; endPos.Should().Be(startPos + 8); - var result = BitConverter.ToUInt64(dataBuffer.UnderlyingBuffer, startPos); + var result = BitConverter.ToUInt64(dataBuffer.AsSpan(startPos)); result.Should().Be(expected); } @@ -169,7 +169,7 @@ public void WriteSingle() var endPos = dataBuffer.Position; endPos.Should().Be(startPos + 4); - var result = BitConverter.ToSingle(dataBuffer.UnderlyingBuffer, startPos); + var result = BitConverter.ToSingle(dataBuffer.AsSpan(startPos)); result.Should().Be(expected); } @@ -189,7 +189,7 @@ public void WriteDouble() var endPos = dataBuffer.Position; endPos.Should().Be(startPos + 8); - var result = BitConverter.ToDouble(dataBuffer.UnderlyingBuffer, startPos); + var result = BitConverter.ToDouble(dataBuffer.AsSpan(startPos)); result.Should().Be(expected); } @@ -209,7 +209,7 @@ public void WriteBool() var endPos = dataBuffer.Position; endPos.Should().Be(startPos + 1); - var result = BitConverter.ToBoolean(dataBuffer.UnderlyingBuffer, startPos); + var result = BitConverter.ToBoolean(dataBuffer.AsSpan(startPos)); result.Should().Be(expected); } @@ -229,7 +229,7 @@ public void WriteChar() var endPos = dataBuffer.Position; endPos.Should().Be(startPos + 2); - var result = BitConverter.ToChar(dataBuffer.UnderlyingBuffer, startPos); + var result = BitConverter.ToChar(dataBuffer.AsSpan(startPos)); result.Should().Be(expected); } @@ -249,7 +249,7 @@ public void WriteUInt16BigEndian() var endPos = dataBuffer.Position; endPos.Should().Be(startPos + 2); - var result = BitConverter.ToUInt16(dataBuffer.UnderlyingBuffer.Skip(startPos).Take(2).Reverse().ToArray(), 0); + var result = BitConverter.ToUInt16(dataBuffer.AsSpan(startPos, 2).ToArray().Reverse().ToArray(), 0); result.Should().Be(expected); } @@ -269,7 +269,7 @@ public void WriteUInt24BigEndian() var endPos = dataBuffer.Position; endPos.Should().Be(startPos + 3); - var buffer = dataBuffer.UnderlyingBuffer.AsSpan(startPos, 3); + var buffer = dataBuffer.AsSpan(startPos, 3).ToArray(); var result = (uint)(buffer[0] << 16 | buffer[1] << 8 | buffer[2]); result.Should().Be(expected); } @@ -290,7 +290,7 @@ public void WriteUInt32BigEndian() var endPos = dataBuffer.Position; endPos.Should().Be(startPos + 4); - var result = BitConverter.ToUInt32(dataBuffer.UnderlyingBuffer.Skip(startPos).Take(4).Reverse().ToArray(), 0); + var result = BitConverter.ToUInt32(dataBuffer.AsSpan(startPos, 4).ToArray().Reverse().ToArray(), 0); result.Should().Be(expected); } @@ -310,7 +310,7 @@ public void WriteInt16BigEndian() var endPos = dataBuffer.Position; endPos.Should().Be(startPos + 2); - var result = BitConverter.ToInt16(dataBuffer.UnderlyingBuffer.Skip(startPos).Take(2).Reverse().ToArray(), 0); + var result = BitConverter.ToInt16(dataBuffer.AsSpan(startPos, 2).ToArray().Reverse().ToArray(), 0); result.Should().Be(expected); } @@ -327,7 +327,7 @@ public void WriteInt24BigEndian() dataBuffer.WriteInt24BigEndian(expected); // Assert - var buffer = dataBuffer.UnderlyingBuffer.AsSpan(startPos, 3); + var buffer = dataBuffer.AsSpan(startPos, 3).ToArray(); var result = buffer[0] << 16 | buffer[1] << 8 | buffer[2]; result.Should().Be(expected); } @@ -348,7 +348,7 @@ public void WriteInt32BigEndian() var endPos = dataBuffer.Position; endPos.Should().Be(startPos + 4); - var result = BitConverter.ToInt32(dataBuffer.UnderlyingBuffer.Skip(startPos).Take(4).Reverse().ToArray(), 0); + var result = BitConverter.ToInt32(dataBuffer.AsSpan(startPos, 4).ToArray().Reverse().ToArray(), 0); result.Should().Be(expected); } @@ -368,7 +368,7 @@ public void WriteBytes() var endPos = dataBuffer.Position; endPos.Should().Be(startPos + expected.Length); - var result = dataBuffer.UnderlyingBuffer.Skip(startPos).Take(expected.Length); + var result = dataBuffer.AsSpan(startPos, expected.Length).ToArray(); result.Should().BeEquivalentTo(expected); } @@ -389,7 +389,7 @@ public void WriteSlice() var endPos = dataBuffer.Position; endPos.Should().Be(startPos + 50); - var result = dataBuffer.UnderlyingBuffer.Skip(startPos).Take(50); + var result = dataBuffer.AsSpan(startPos, 50).ToArray(); result.Should().BeEquivalentTo(expected); } @@ -409,7 +409,7 @@ public void WriteSpan() var endPos = dataBuffer.Position; endPos.Should().Be(startPos + expected.Length); - var result = dataBuffer.UnderlyingBuffer.Skip(startPos).Take(expected.Length); + var result = dataBuffer.AsSpan(startPos, expected.Length).ToArray(); result.Should().BeEquivalentTo(expected); } @@ -429,7 +429,7 @@ public void WriteMemory() var endPos = dataBuffer.Position; endPos.Should().Be(startPos + expected.Length); - var result = dataBuffer.UnderlyingBuffer.Skip(startPos).Take(expected.Length); + var result = dataBuffer.AsSpan(startPos, expected.Length).ToArray(); result.Should().BeEquivalentTo(expected); } } diff --git a/test/LiveStreamingServerNet.Flv.Test/FlvWriter.Test.cs b/test/LiveStreamingServerNet.Flv.Test/FlvWriter.Test.cs index a0f679ea..afc05e34 100644 --- a/test/LiveStreamingServerNet.Flv.Test/FlvWriter.Test.cs +++ b/test/LiveStreamingServerNet.Flv.Test/FlvWriter.Test.cs @@ -81,7 +81,7 @@ private static bool VerifyTagBytes(ReadOnlyMemory bytes, FlvTagType tagTyp dataBuffer.Write(payload); dataBuffer.WriteUInt32BigEndian((uint)dataBuffer.Size); - return bytes.ToArray().SequenceEqual(dataBuffer.UnderlyingBuffer.Take(dataBuffer.Size).ToArray()); + return bytes.ToArray().SequenceEqual(dataBuffer.AsSpan().ToArray()); } } } diff --git a/test/LiveStreamingServerNet.Flv.Test/Services/FlvMediaTagSenderService.Test.cs b/test/LiveStreamingServerNet.Flv.Test/Services/FlvMediaTagSenderService.Test.cs index af1c6da4..0595d14a 100644 --- a/test/LiveStreamingServerNet.Flv.Test/Services/FlvMediaTagSenderService.Test.cs +++ b/test/LiveStreamingServerNet.Flv.Test/Services/FlvMediaTagSenderService.Test.cs @@ -46,7 +46,7 @@ public async Task SendMediaTagAsync_Should_SendMediaTag(MediaType mediaType) // Assert await _flvClient.Received(1).WriteTagAsync(expectedFlvType, timestamp, Arg.Any>(), Arg.Any()); - dataBuffer.UnderlyingBuffer.Take(dataBuffer.Size).ToArray().Should().BeEquivalentTo(payloadBuffer.Take(payloadSize).ToArray()); + dataBuffer.AsSpan().ToArray().Should().BeEquivalentTo(payloadBuffer.Take(payloadSize).ToArray()); } } } diff --git a/test/LiveStreamingServerNet.Rtmp.Server.Test/Services/RtmpCommandMessageSenderService.Test.cs b/test/LiveStreamingServerNet.Rtmp.Server.Test/Services/RtmpCommandMessageSenderService.Test.cs index 407c9ca2..c292b980 100644 --- a/test/LiveStreamingServerNet.Rtmp.Server.Test/Services/RtmpCommandMessageSenderService.Test.cs +++ b/test/LiveStreamingServerNet.Rtmp.Server.Test/Services/RtmpCommandMessageSenderService.Test.cs @@ -96,7 +96,7 @@ public void SendCommandMessage_Should_SendCommandMessage() Arg.Any>(), callback); - dataBuffer.UnderlyingBuffer.Take(dataBuffer.Size).Should().BeEquivalentTo(expectedBuffer.UnderlyingBuffer.Take(expectedBuffer.Size)); + dataBuffer.AsSpan().ToArray().Should().BeEquivalentTo(expectedBuffer.AsSpan().ToArray()); } [Fact] @@ -168,7 +168,7 @@ await _commandMessageSender.SendCommandMessageAsync( Arg.Any>(), Arg.Any>()); - dataBuffer.UnderlyingBuffer.Take(dataBuffer.Size).Should().BeEquivalentTo(expectedBuffer.UnderlyingBuffer.Take(expectedBuffer.Size)); + dataBuffer.AsSpan().ToArray().Should().BeEquivalentTo(expectedBuffer.AsSpan().ToArray()); } [Fact] @@ -237,7 +237,7 @@ public void SendCommandMessageAsync_Should_BroadcastCommandMessages() ), Arg.Any>()); - dataBuffer.UnderlyingBuffer.Take(dataBuffer.Size).Should().BeEquivalentTo(expectedBuffer.UnderlyingBuffer.Take(expectedBuffer.Size)); + dataBuffer.AsSpan().ToArray().Should().BeEquivalentTo(expectedBuffer.AsSpan().ToArray()); } private static List GetParameters(string commandName, double transactionId, diff --git a/test/LiveStreamingServerNet.Rtmp.Server.Test/Services/RtmpMediaMessageCacherService.Test.cs b/test/LiveStreamingServerNet.Rtmp.Server.Test/Services/RtmpMediaMessageCacherService.Test.cs index 7da29e22..a08c4e19 100644 --- a/test/LiveStreamingServerNet.Rtmp.Server.Test/Services/RtmpMediaMessageCacherService.Test.cs +++ b/test/LiveStreamingServerNet.Rtmp.Server.Test/Services/RtmpMediaMessageCacherService.Test.cs @@ -226,7 +226,7 @@ public void SendCachedHeaderMessages_Should_SendAudioSequenceHeader_When_AudioSe Arg.Is(x => x.MessageTypeId == RtmpMessageType.AudioMessage), Arg.Any>()); - audioBuffer.UnderlyingBuffer.Take(audioBuffer.Size).Should().BeEquivalentTo(audioSequenceHeader); + audioBuffer.AsSpan().ToArray().Should().BeEquivalentTo(audioSequenceHeader); } [Fact] @@ -264,12 +264,12 @@ public void SendCachedHeaderMessages_Should_SendVideoSequenceHeader_When_VideoSe // Assert _chunkMessageSender.Received(1).Send( - subscriber_clientContext, - Arg.Is(x => x.ChunkType == 0 && x.ChunkStreamId == videoChunkStreamId), - Arg.Is(x => x.MessageTypeId == RtmpMessageType.VideoMessage), - Arg.Any>()); + subscriber_clientContext, + Arg.Is(x => x.ChunkType == 0 && x.ChunkStreamId == videoChunkStreamId), + Arg.Is(x => x.MessageTypeId == RtmpMessageType.VideoMessage), + Arg.Any>()); - videoBuffer.UnderlyingBuffer.Take(videoBuffer.Size).Should().BeEquivalentTo(videoSequenceHeader); + videoBuffer.AsSpan().ToArray().Should().BeEquivalentTo(videoSequenceHeader); } [Fact] @@ -423,7 +423,7 @@ public void SendCachedGroupOfPictures_Should_SendGroupOfPictures() Arg.Any(), Arg.Any>()); - payloadsBuffer.UnderlyingBuffer.Take(payloadsBuffer.Size).Should().BeEquivalentTo(expectedPayloadsBufffer); + payloadsBuffer.AsSpan().ToArray().Should().BeEquivalentTo(expectedPayloadsBufffer); } } } diff --git a/test/LiveStreamingServerNet.Rtmp.Server.Test/Services/RtmpProtocolControlService.Test.cs b/test/LiveStreamingServerNet.Rtmp.Server.Test/Services/RtmpProtocolControlService.Test.cs index fa9a6c1f..47efba8b 100644 --- a/test/LiveStreamingServerNet.Rtmp.Server.Test/Services/RtmpProtocolControlService.Test.cs +++ b/test/LiveStreamingServerNet.Rtmp.Server.Test/Services/RtmpProtocolControlService.Test.cs @@ -70,8 +70,7 @@ public void SetChunkSize_Should_SendChunkSizeMessageAndSetOutChunkSize() Arg.Any>() ); - _payloadBuffer.UnderlyingBuffer.Take(_payloadBuffer.Size) - .Should().BeEquivalentTo(expectedBuffer.UnderlyingBuffer.Take(expectedBuffer.Size)); + _payloadBuffer.AsSpan().ToArray().Should().BeEquivalentTo(expectedBuffer.AsSpan().ToArray()); _clientContext.Received(1).OutChunkSize = outChunkSize; } @@ -104,8 +103,7 @@ public void AbortMessage_Should_SendAbortMessage() Arg.Any>() ); - _payloadBuffer.UnderlyingBuffer.Take(_payloadBuffer.Size) - .Should().BeEquivalentTo(expectedBuffer.UnderlyingBuffer.Take(expectedBuffer.Size)); + _payloadBuffer.AsSpan().ToArray().Should().BeEquivalentTo(expectedBuffer.AsSpan().ToArray()); } [Fact] @@ -136,8 +134,7 @@ public void Acknowledgement_Should_SendAcknowledgementMessage() Arg.Any>() ); - _payloadBuffer.UnderlyingBuffer.Take(_payloadBuffer.Size) - .Should().BeEquivalentTo(expectedBuffer.UnderlyingBuffer.Take(expectedBuffer.Size)); + _payloadBuffer.AsSpan().ToArray().Should().BeEquivalentTo(expectedBuffer.AsSpan().ToArray()); } [Fact] @@ -168,8 +165,7 @@ public void WindowAcknowledgementSize_Should_SendWindowAcknowledgementSizeMessag Arg.Any>() ); - _payloadBuffer.UnderlyingBuffer.Take(_payloadBuffer.Size) - .Should().BeEquivalentTo(expectedBuffer.UnderlyingBuffer.Take(expectedBuffer.Size)); + _payloadBuffer.AsSpan().ToArray().Should().BeEquivalentTo(expectedBuffer.AsSpan().ToArray()); _clientContext.Received(1).OutWindowAcknowledgementSize = size; } @@ -204,8 +200,7 @@ public void SetPeerBandwidth_Should_SendSetPeerBandwidthMessage() Arg.Any>() ); - _payloadBuffer.UnderlyingBuffer.Take(_payloadBuffer.Size) - .Should().BeEquivalentTo(expectedBuffer.UnderlyingBuffer.Take(expectedBuffer.Size)); + _payloadBuffer.AsSpan().ToArray().Should().BeEquivalentTo(expectedBuffer.AsSpan().ToArray()); } } } diff --git a/test/LiveStreamingServerNet.Rtmp.Server.Test/Services/RtmpUserControlMessageSenderService.Test.cs b/test/LiveStreamingServerNet.Rtmp.Server.Test/Services/RtmpUserControlMessageSenderService.Test.cs index 6fa2b692..99757a1b 100644 --- a/test/LiveStreamingServerNet.Rtmp.Server.Test/Services/RtmpUserControlMessageSenderService.Test.cs +++ b/test/LiveStreamingServerNet.Rtmp.Server.Test/Services/RtmpUserControlMessageSenderService.Test.cs @@ -91,8 +91,7 @@ public void SendStreamBeginMessage_Should_SendStreamBeginMessage() Arg.Any>() ); - _payloadBuffer.UnderlyingBuffer.Take(_payloadBuffer.Size) - .Should().BeEquivalentTo(expectedBuffer.UnderlyingBuffer.Take(expectedBuffer.Size)); + _payloadBuffer.AsSpan().ToArray().Should().BeEquivalentTo(expectedBuffer.AsSpan().ToArray()); } @@ -125,8 +124,7 @@ public void SendStreamBeginMessage_Should_BroadcastStreamBeginMessage() Arg.Any>() ); - _payloadBuffer.UnderlyingBuffer.Take(_payloadBuffer.Size) - .Should().BeEquivalentTo(expectedBuffer.UnderlyingBuffer.Take(expectedBuffer.Size)); + _payloadBuffer.AsSpan().ToArray().Should().BeEquivalentTo(expectedBuffer.AsSpan().ToArray()); } [Fact] @@ -159,8 +157,7 @@ public void SendStreamEofMessage_Should_SendStreamEofMessage() Arg.Any>() ); - _payloadBuffer.UnderlyingBuffer.Take(_payloadBuffer.Size) - .Should().BeEquivalentTo(expectedBuffer.UnderlyingBuffer.Take(expectedBuffer.Size)); + _payloadBuffer.AsSpan().ToArray().Should().BeEquivalentTo(expectedBuffer.AsSpan().ToArray()); } [Fact] @@ -192,8 +189,7 @@ public void SendStreamEofMessage_Should_BroadcastStreamEofMessage() Arg.Any>() ); - _payloadBuffer.UnderlyingBuffer.Take(_payloadBuffer.Size) - .Should().BeEquivalentTo(expectedBuffer.UnderlyingBuffer.Take(expectedBuffer.Size)); + _payloadBuffer.AsSpan().ToArray().Should().BeEquivalentTo(expectedBuffer.AsSpan().ToArray()); } } } diff --git a/test/LiveStreamingServerNet.Rtmp.Test/Services/RtmpChunkMessageAggregatorService.Test.cs b/test/LiveStreamingServerNet.Rtmp.Test/Services/RtmpChunkMessageAggregatorService.Test.cs index 1595e812..83105a92 100644 --- a/test/LiveStreamingServerNet.Rtmp.Test/Services/RtmpChunkMessageAggregatorService.Test.cs +++ b/test/LiveStreamingServerNet.Rtmp.Test/Services/RtmpChunkMessageAggregatorService.Test.cs @@ -72,7 +72,7 @@ internal async Task AggregateChunkMessagesAsync_Should_AggregateChunksCorrectly< chunkStreamContext.Timestamp.Should().Be(initialTimestamp + chunkStreamContext.MessageHeader.Timestamp); } - chunkStreamContext.PayloadBuffer!.UnderlyingBuffer.Take(chunkStreamContext.PayloadBuffer!.Size) + chunkStreamContext.PayloadBuffer!.AsSpan(0, chunkStreamContext.PayloadBuffer!.Size).ToArray() .Should().BeEquivalentTo(payload); } @@ -196,7 +196,7 @@ private static MemoryStream CreateStream using var tempBuffer = new DataBuffer(); writer.Write(tempBuffer, basicHeader, messageHeader, payloadBuffer, chunkSize); - return new MemoryStream(tempBuffer.UnderlyingBuffer.Take(tempBuffer.Size).ToArray()); + return new MemoryStream(tempBuffer.AsSpan().ToArray()); } } } diff --git a/test/LiveStreamingServerNet.Rtmp.Test/Services/RtmpChunkMessageWriterService.Test.cs b/test/LiveStreamingServerNet.Rtmp.Test/Services/RtmpChunkMessageWriterService.Test.cs index 8cac9a0b..636be1ec 100644 --- a/test/LiveStreamingServerNet.Rtmp.Test/Services/RtmpChunkMessageWriterService.Test.cs +++ b/test/LiveStreamingServerNet.Rtmp.Test/Services/RtmpChunkMessageWriterService.Test.cs @@ -43,7 +43,7 @@ public async Task Write_Should_SeparateChunkType0MessageIntoChunks(uint timestam service.Write(streamBuffer, basicHeader, messageHeader, payloadBuffer.MoveTo(0), (uint)expectedChunkSize); // Assert - await using var stream = new NetworkStream(new MemoryStream(streamBuffer.UnderlyingBuffer)); + await using var stream = new NetworkStream(new MemoryStream(streamBuffer.AsSpan().ToArray())); using var targetBuffer = new DataBuffer(); var remainingPayloadSize = expectedPayload.Length; @@ -84,7 +84,7 @@ async Task AssertFirstChunk(INetworkStreamReader stream) using var tempBuffer = new DataBuffer(); await tempBuffer.FromStreamData(stream, Math.Min(expectedChunkSize, remainingPayloadSize)); - targetBuffer.Write(tempBuffer.UnderlyingBuffer, 0, tempBuffer.Size); + targetBuffer.Write(tempBuffer.AsSpan()); remainingPayloadSize -= expectedChunkSize; } @@ -105,7 +105,7 @@ async Task AssertRemainingChunk(INetworkStreamReader stream) using var tempBuffer = new DataBuffer(); await tempBuffer.FromStreamData(stream, Math.Min(expectedChunkSize, remainingPayloadSize)); - targetBuffer.Write(tempBuffer.UnderlyingBuffer, 0, tempBuffer.Size); + targetBuffer.Write(tempBuffer.AsSpan()); remainingPayloadSize -= expectedChunkSize; }