Skip to content

Commit

Permalink
refactor: data buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
josephnhtam committed Oct 6, 2024
1 parent 59d76b3 commit 8c47074
Show file tree
Hide file tree
Showing 23 changed files with 286 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void WriteTag(FlvTagType tagType, uint timestamp, ReadOnlySpan<byte> payl
dataBuffer.Write(payload);
dataBuffer.WriteUInt32BigEndian(packageSize);

_streamWriter.Write(dataBuffer.UnderlyingBuffer.AsSpan(0, dataBuffer.Size));
_streamWriter.Write(dataBuffer.AsSpan());
}

public void Dispose()
Expand Down
4 changes: 1 addition & 3 deletions src/LiveStreamingServerNet.Flv/Internal/FlvWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ public async ValueTask WriteTagAsync(FlvTagType tagType, uint timestamp, Action<
var header = new FlvTagHeader(tagType, payloadSize, timestamp);
header.Write(dataBuffer.MoveTo(0));

await _streamWriter.WriteAsync(
new ArraySegment<byte>(dataBuffer.UnderlyingBuffer, 0, dataBuffer.Size),
cancellationToken);
await _streamWriter.WriteAsync(dataBuffer.AsMemory(), cancellationToken);
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,7 @@ private static void ProcessExVideoHeader(
[MethodImpl(MethodImplOptions.AggressiveInlining)]
static void RemovePadding(IDataBuffer payloadBuffer, VideoPacketType packetType, int size)
{
payloadBuffer.UnderlyingBuffer.AsSpan(size).CopyTo(payloadBuffer.UnderlyingBuffer.AsSpan());
payloadBuffer.Size -= size;
payloadBuffer.TrimStart(size);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public GroupOfPicturesCache(IBufferPool? bufferPool)

public void Add(PictureCacheInfo info, IDataBuffer buffer)
{
_cache.Write(info, buffer.UnderlyingBuffer.AsSpan(0, buffer.Size));
_cache.Write(info, buffer.AsSpan());
}

public void Clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@ internal static unsafe class CRC32
0xbcb4666d, 0xb8757bda, 0xb5365d03, 0xb1f740b4
};

public static uint Generate(byte[] data, int offset, int length)
public static uint Generate(Span<byte> data)
{
var dataLength = data.Length;

unchecked
{
fixed (byte* dataPtr = data)
Expand All @@ -79,7 +81,7 @@ public static uint Generate(byte[] data, int offset, int length)
{
uint crc = 0xffffffff;

for (int pos = offset; pos < offset + length; pos++)
for (int pos = 0; pos < dataLength; pos++)
{
var index = ((crc >> 24) ^ *(dataPtr + pos)) & 0xff;
crc = (crc << 8) ^ *(tablePtr + index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,13 +296,20 @@ private void WriteTable(IDataBuffer dataBuffer)
}
}

private record struct TableChecksum(byte[] Buffer, int Start, int Length)
private ref struct TableChecksum
{
public const int Size = 4;

private readonly Span<byte> _buffer;

public TableChecksum(Span<byte> buffer)
{
_buffer = buffer;
}

public void Write(IDataBuffer dataBuffer)
{
var checksum = CRC32.Generate(Buffer, Start, Length);
var checksum = CRC32.Generate(_buffer);
dataBuffer.WriteUInt32BigEndian(checksum);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private void WritePSIPacket(IDataBuffer tsBuffer, ushort packetID, byte tableID,

var checksumStart = psiStartPosition + psiHeader.ChecksumOffset;
var checksumLength = tsBuffer.Position - checksumStart;
var checksum = new TableChecksum(tsBuffer.UnderlyingBuffer, checksumStart, checksumLength);
var checksum = new TableChecksum(tsBuffer.AsSpan(checksumStart, checksumLength));
checksum.Write(tsBuffer);

var remainingSize = TsConstants.TsPacketSize - (tsBuffer.Position - startPosition);
Expand Down Expand Up @@ -347,11 +347,11 @@ private async ValueTask FlushBuffersAsync(string path)

if (_flushedCount == 0)
{
await fileStream.WriteAsync(_headerBuffer.UnderlyingBuffer.AsMemory(0, _headerBuffer.Size));
await fileStream.WriteAsync(_headerBuffer.AsMemory(0, _headerBuffer.Size));
_headerBuffer.Reset();
}

await fileStream.WriteAsync(_payloadBuffer.UnderlyingBuffer.AsMemory(0, _payloadBuffer.Size));
await fileStream.WriteAsync(_payloadBuffer.AsMemory(0, _payloadBuffer.Size));
_payloadBuffer.Reset();

_flushedCount++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,17 @@ public interface IDataBuffer : IDisposable
int Position { get; set; }
int Size { get; set; }

byte[] UnderlyingBuffer { get; }
Span<byte> AsSpan();
Span<byte> AsSpan(int offset);
Span<byte> AsSpan(int offset, int length);
Memory<byte> AsMemory();
Memory<byte> AsMemory(int offset);
Memory<byte> AsMemory(int offset, int length);
ArraySegment<byte> AsSegment();
ArraySegment<byte> AsSegment(int offset);
ArraySegment<byte> AsSegment(int offset, int length);

void TrimStart(int count);
void Advance(int count);
IDataBuffer MoveTo(int position);
void Reset();
Expand Down Expand Up @@ -39,7 +48,7 @@ public interface IDataBuffer : IDisposable
ushort ReadUInt16BigEndian();
uint ReadUInt24BigEndian();
uint ReadUInt32BigEndian();
short ReadInt16BiEndian();
short ReadInt16BigEndian();
int ReadInt24BigEndian();
int ReadInt32BigEndian();

Expand Down
47 changes: 27 additions & 20 deletions src/LiveStreamingServerNet.Utilities/Buffers/DataBuffer.Read.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public partial class DataBuffer
public void ReadBytes(byte[] buffer, int index, int count)
{
var targetSpan = buffer.AsSpan(index, count);
_buffer.AsSpan(_position, count).CopyTo(targetSpan);
_buffer.AsSpan(_startIndex + _position, count).CopyTo(targetSpan);
Advance(count);
}

Expand All @@ -21,108 +21,115 @@ public byte[] ReadBytes(int count)

public byte ReadByte()
{
return _buffer[_position++];
return _buffer[_startIndex + _position++];
}

public short ReadInt16()
{
var value = Unsafe.As<byte, short>(ref _buffer[_position]);
var value = Unsafe.As<byte, short>(ref _buffer[_startIndex + _position]);
_position += 2;
return value;
}

public int ReadInt32()
{
var value = Unsafe.As<byte, int>(ref _buffer[_position]);
var value = Unsafe.As<byte, int>(ref _buffer[_startIndex + _position]);
_position += 4;
return value;
}

public long ReadInt64()
{
var value = Unsafe.As<byte, long>(ref _buffer[_position]);
var value = Unsafe.As<byte, long>(ref _buffer[_startIndex + _position]);
_position += 8;
return value;
}

public ushort ReadUInt16()
{
var value = Unsafe.As<byte, ushort>(ref _buffer[_position]);
var value = Unsafe.As<byte, ushort>(ref _buffer[_startIndex + _position]);
_position += 2;
return value;
}

public uint ReadUInt32()
{
var value = Unsafe.As<byte, uint>(ref _buffer[_position]);
var value = Unsafe.As<byte, uint>(ref _buffer[_startIndex + _position]);
_position += 4;
return value;
}

public ulong ReadUInt64()
{
var value = Unsafe.As<byte, ulong>(ref _buffer[_position]);
var value = Unsafe.As<byte, ulong>(ref _buffer[_startIndex + _position]);
_position += 8;
return value;
}

public float ReadSingle()
{
var value = Unsafe.As<byte, float>(ref _buffer[_position]);
var value = Unsafe.As<byte, float>(ref _buffer[_startIndex + _position]);
_position += 4;
return value;
}

public double ReadDouble()
{
var value = Unsafe.As<byte, double>(ref _buffer[_position]);
var value = Unsafe.As<byte, double>(ref _buffer[_startIndex + _position]);
_position += 8;
return value;
}

public bool ReadBoolean()
{
return Unsafe.As<byte, bool>(ref _buffer[_position++]);
return Unsafe.As<byte, bool>(ref _buffer[_startIndex + _position++]);
}

public char ReadChar()
{
var value = Unsafe.As<byte, char>(ref _buffer[_position]);
var value = Unsafe.As<byte, char>(ref _buffer[_startIndex + _position]);
_position += 2;
return value;
}

public ushort ReadUInt16BigEndian()
{
var value = BinaryPrimitives.ReverseEndianness(Unsafe.As<byte, ushort>(ref _buffer[_position]));
var value = BinaryPrimitives.ReverseEndianness(Unsafe.As<byte, ushort>(ref _buffer[_startIndex + _position]));
_position += 2;
return value;
}

public uint ReadUInt24BigEndian()
{
var value = (uint)(_buffer[_position] << 16 | _buffer[_position + 1] << 8 | _buffer[_position + 2]);
var value =
_buffer[_startIndex + _position] << 16 |
_buffer[_startIndex + _position + 1] << 8 |
_buffer[_startIndex + _position + 2];

_position += 3;
return value;
return (uint)value;
}

public uint ReadUInt32BigEndian()
{
var value = BinaryPrimitives.ReverseEndianness(Unsafe.As<byte, uint>(ref _buffer[_position]));
var value = BinaryPrimitives.ReverseEndianness(Unsafe.As<byte, uint>(ref _buffer[_startIndex + _position]));
_position += 4;
return value;
}

public short ReadInt16BiEndian()
public short ReadInt16BigEndian()
{
var value = BinaryPrimitives.ReverseEndianness(Unsafe.As<byte, short>(ref _buffer[_position]));
var value = BinaryPrimitives.ReverseEndianness(Unsafe.As<byte, short>(ref _buffer[_startIndex + _position]));
_position += 2;
return value;
}

public int ReadInt24BigEndian()
{
var value = _buffer[_position] << 16 | _buffer[_position + 1] << 8 | _buffer[_position + 2];
var value =
_buffer[_startIndex + _position] << 16 |
_buffer[_startIndex + _position + 1] << 8 |
_buffer[_startIndex + _position + 2];

if ((value & 0x800000) != 0)
value |= unchecked((int)0xff000000);
Expand All @@ -133,7 +140,7 @@ public int ReadInt24BigEndian()

public int ReadInt32BigEndian()
{
var value = BinaryPrimitives.ReverseEndianness(Unsafe.As<byte, int>(ref _buffer[_position]));
var value = BinaryPrimitives.ReverseEndianness(Unsafe.As<byte, int>(ref _buffer[_startIndex + _position]));
_position += 4;
return value;
}
Expand Down
28 changes: 13 additions & 15 deletions src/LiveStreamingServerNet.Utilities/Buffers/DataBuffer.Write.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,55 +13,55 @@ public void WriteRandomBytes(int count)
Advance(count);

for (int i = 0; i < count; i++)
_buffer[pos + i] = (byte)RandomNumberGenerator.GetInt32(0, 255);
_buffer[_startIndex + pos + i] = (byte)RandomNumberGenerator.GetInt32(0, 255);
}

public void Write(Memory<byte> memory)
{
var pos = _position;
Advance(memory.Length);

memory.Span.CopyTo(_buffer.AsSpan(pos));
memory.Span.CopyTo(_buffer.AsSpan(_startIndex + pos));
}

public void Write(byte[] buffer)
{
var pos = _position;
Advance(buffer.Length);

buffer.AsSpan().CopyTo(_buffer.AsSpan(pos));
buffer.AsSpan().CopyTo(_buffer.AsSpan(_startIndex + pos));
}

public void Write(byte[] buffer, int offset, int count)
{
var pos = _position;
Advance(count);

buffer.AsSpan(offset, count).CopyTo(_buffer.AsSpan(pos));
buffer.AsSpan(offset, count).CopyTo(_buffer.AsSpan(_startIndex + pos));
}

public void Write(ReadOnlySpan<byte> buffer)
{
var pos = _position;
Advance(buffer.Length);

buffer.CopyTo(_buffer.AsSpan(pos));
buffer.CopyTo(_buffer.AsSpan(_startIndex + pos));
}

public void Write(byte value)
{
var pos = _position;
Advance(1);

_buffer[pos] = value;
_buffer[_startIndex + pos] = value;
}

private void WriteUnaligned<T>(T value, int size)
{
var pos = _position;
Advance(size);

var targetSpan = _buffer.AsSpan(pos);
var targetSpan = _buffer.AsSpan(_startIndex + pos);
Unsafe.WriteUnaligned(ref MemoryMarshal.GetReference(targetSpan), value);
}

Expand Down Expand Up @@ -125,10 +125,9 @@ public void WriteUInt24BigEndian(uint value)
var pos = _position;
Advance(3);

_position = pos;
Write((byte)(value >> 16));
Write((byte)(value >> 8));
Write((byte)(value));
_buffer[_startIndex + pos] = (byte)(value >> 16);
_buffer[_startIndex + pos + 1] = (byte)(value >> 8);
_buffer[_startIndex + pos + 2] = (byte)value;
}

public void WriteUInt32BigEndian(uint value)
Expand All @@ -146,10 +145,9 @@ public void WriteInt24BigEndian(int value)
var pos = _position;
Advance(3);

_position = pos;
Write((byte)(value >> 16));
Write((byte)(value >> 8));
Write((byte)(value));
_buffer[_startIndex + pos] = (byte)(value >> 16);
_buffer[_startIndex + pos + 1] = (byte)(value >> 8);
_buffer[_startIndex + pos + 2] = (byte)value;
}

public void WriteInt32BigEndian(int value)
Expand Down
Loading

0 comments on commit 8c47074

Please sign in to comment.