Skip to content

Commit

Permalink
Fixed concatenation of unbounded number of streams
Browse files Browse the repository at this point in the history
  • Loading branch information
sakno committed Dec 28, 2024
1 parent 781ad68 commit 83b41e5
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 3 deletions.
16 changes: 14 additions & 2 deletions src/DotNext.IO/IO/SparseStream.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;

Expand Down Expand Up @@ -207,9 +208,20 @@ protected override ReadOnlySpan<Stream> Streams
=> MemoryMarshal.CreateReadOnlySpan(in Unsafe.As<T, Stream>(ref Unsafe.AsRef(in streams)), streams.Length);
}

internal sealed class UnboundedSparseStream(ReadOnlySpan<Stream> streams, bool leaveOpen) : SparseStream(leaveOpen)
internal sealed class UnboundedSparseStream : SparseStream
{
private MemoryOwner<Stream> streams = streams.Copy();
private MemoryOwner<Stream> streams;

internal UnboundedSparseStream(Stream stream, ReadOnlySpan<Stream> streams, bool leaveOpen)
: base(leaveOpen)
{
Debug.Assert(streams.Length < int.MaxValue);

this.streams = Memory.AllocateExactly<Stream>(streams.Length + 1);
var output = this.streams.Span;
output[0] = stream;
streams.CopyTo(output.Slice(1));
}

protected override ReadOnlySpan<Stream> Streams => streams.Span;

Expand Down
5 changes: 4 additions & 1 deletion src/DotNext.IO/IO/StreamExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ private static Stream Combine(Stream stream, ReadOnlySpan<Stream> others, bool l
[var s1, var s2, var s3, var s4] => new SparseStream<(Stream, Stream, Stream, Stream, Stream)>((stream, s1, s2, s3, s4), leaveOpen),
[var s1, var s2, var s3, var s4, var s5] => new SparseStream<(Stream, Stream, Stream, Stream, Stream, Stream)>((stream, s1, s2, s3, s4,
s5), leaveOpen),
_ => new UnboundedSparseStream(others, leaveOpen),
[var s1, var s2, var s3, var s4, var s5, var s6] => new SparseStream<(Stream, Stream, Stream, Stream, Stream, Stream, Stream)>((stream, s1, s2, s3, s4,
s5, s6), leaveOpen),
{ Length: int.MaxValue } => throw new InsufficientMemoryException(),
_ => new UnboundedSparseStream(stream, others, leaveOpen),
};

/// <summary>
Expand Down
30 changes: 30 additions & 0 deletions src/DotNext.Tests/IO/StreamExtensionsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,36 @@ public static void ReadBytesFromCombinedStream()
Equal(-1, combined.ReadByte());
}

[Theory]
[InlineData(2)]
[InlineData(3)]
[InlineData(4)]
[InlineData(5)]
[InlineData(6)]
[InlineData(7)]
[InlineData(8)]
[InlineData(9)]
[InlineData(10)]
public static void CombineManyStreams(byte streamCount)
{
using var stream = GetStreams(streamCount).Combine(leaveOpen: false);
var actual = new byte[streamCount];
stream.ReadExactly(actual);
var expected = Set.Range<byte, EnclosedEndpoint<byte>, DisclosedEndpoint<byte>>(0, streamCount);
Equal(expected, actual);

static IEnumerable<Stream> GetStreams(byte streamCount)
{
for (var i = 0; i < streamCount; i++)
{
var ms = new MemoryStream();
ms.WriteByte((byte)i);
ms.Seek(0L, SeekOrigin.Begin);
yield return ms;
}
}
}

[Fact]
public static async Task UnsupportedMethodsOfSparseStream()
{
Expand Down

0 comments on commit 83b41e5

Please sign in to comment.