Skip to content

Commit

Permalink
Added ability to define lifetime of the wrapped streams (#255)
Browse files Browse the repository at this point in the history
  • Loading branch information
sakno committed Dec 28, 2024
1 parent 1c04819 commit 817e0e0
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 21 deletions.
46 changes: 41 additions & 5 deletions src/DotNext.IO/IO/SparseStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace DotNext.IO;
/// <remarks>
/// The stream is available for read-only operations.
/// </remarks>
internal abstract class SparseStream : Stream, IFlushable
internal abstract class SparseStream(bool leaveOpen) : Stream, IFlushable
{
private int runningIndex;

Expand Down Expand Up @@ -177,24 +177,60 @@ public sealed override IAsyncResult BeginWrite(byte[] buffer, int offset, int co

/// <inheritdoc/>
public sealed override void EndWrite(IAsyncResult asyncResult) => throw new InvalidOperationException();

protected override void Dispose(bool disposing)
{
if (disposing && !leaveOpen)
{
Disposable.Dispose(Streams);
}
}

public override async ValueTask DisposeAsync()
{
for (var i = 0; i < Streams.Length; i++)
{
await Streams[i].DisposeAsync().ConfigureAwait(false);
}

GC.SuppressFinalize(this);
}
}

internal sealed class SparseStream<T>(T streams) : SparseStream
internal sealed class SparseStream<T>(T streams, bool leaveOpen) : SparseStream(leaveOpen)
where T : struct, ITuple
{
protected override ReadOnlySpan<Stream> Streams
=> MemoryMarshal.CreateReadOnlySpan(in Unsafe.As<T, Stream>(ref Unsafe.AsRef(in streams)), streams.Length);
}

internal sealed class UnboundedSparseStream(ReadOnlySpan<Stream> streams) : SparseStream
internal sealed class UnboundedSparseStream(ReadOnlySpan<Stream> streams, bool leaveOpen) : SparseStream(leaveOpen)
{
private MemoryOwner<Stream> streams = streams.Copy();

protected override ReadOnlySpan<Stream> Streams => streams.Span;

protected override void Dispose(bool disposing)
{
streams.Dispose();
base.Dispose(disposing);
try
{
base.Dispose(disposing);
}
finally
{
streams.Dispose();
}
}

public override async ValueTask DisposeAsync()
{
try
{
await base.DisposeAsync().ConfigureAwait(false);
}
finally
{
streams.Dispose();
}
}
}
35 changes: 20 additions & 15 deletions src/DotNext.IO/IO/StreamExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,43 +21,48 @@ internal static void ThrowIfEmpty<T>(in Memory<T> buffer, [CallerArgumentExpress
throw new ArgumentException(ExceptionMessages.BufferTooSmall, expression);
}

private static Stream Combine(Stream stream, ReadOnlySpan<Stream> others, bool leaveOpen)
=> others switch
{
[] => stream,
[var s] => new SparseStream<(Stream, Stream)>((stream, s), leaveOpen),
[var s1, var s2] => new SparseStream<(Stream, Stream, Stream)>((stream, s1, s2), leaveOpen),
[var s1, var s2, var s3] => new SparseStream<(Stream, Stream, Stream, Stream)>((stream, s1, s2, s3), leaveOpen),
[var s1, var s2, var s3, var s4] => new SparseStream<(Stream, Stream, Stream, Stream, Stream)>((stream, s1, s2, s3, s4), leaveOpen),
[var s1, var s2, var s3, var s4, var s5] => new SparseStream<(Stream, Stream, Stream, Stream, Stream, Stream)>((stream, s1, s2, s3, s4,
s5), leaveOpen),
_ => new UnboundedSparseStream(others, leaveOpen),
};

/// <summary>
/// Combines multiple readable streams.
/// </summary>
/// <param name="stream">The stream to combine.</param>
/// <param name="others">A collection of streams.</param>
/// <returns>An object that represents multiple streams as one logical stream.</returns>
public static Stream Combine(this Stream stream, ReadOnlySpan<Stream> others) // TODO: Use params in future
=> others switch
{
[] => stream,
[var s] => new SparseStream<(Stream, Stream)>((stream, s)),
[var s1, var s2] => new SparseStream<(Stream, Stream, Stream)>((stream, s1, s2)),
[var s1, var s2, var s3] => new SparseStream<(Stream, Stream, Stream, Stream)>((stream, s1, s2, s3)),
[var s1, var s2, var s3, var s4] => new SparseStream<(Stream, Stream, Stream, Stream, Stream)>((stream, s1, s2, s3, s4)),
[var s1, var s2, var s3, var s4, var s5] => new SparseStream<(Stream, Stream, Stream, Stream, Stream, Stream)>((stream, s1, s2, s3, s4,
s5)),
_ => new UnboundedSparseStream(others.ToArray()),
};
=> Combine(stream, others, leaveOpen: true);

/// <summary>
/// Combines multiple readable streams.
/// </summary>
/// <param name="streams">A collection of streams.</param>
/// <param name="leaveOpen"><see langword="true"/> to keep the wrapped streams alive when combined stream disposed; otherwise, <see langword="false"/>.</param>
/// <returns>An object that represents multiple streams as one logical stream.</returns>
/// <exception cref="ArgumentException"><paramref name="streams"/> is empty.</exception>
public static Stream Combine(this ReadOnlySpan<Stream> streams)
public static Stream Combine(this ReadOnlySpan<Stream> streams, bool leaveOpen = true)
=> streams is [var first, .. var rest]
? Combine(first, rest)
? Combine(first, rest, leaveOpen)
: throw new ArgumentException(ExceptionMessages.BufferTooSmall, nameof(streams));

/// <summary>
/// Combines multiple readable streams.
/// </summary>
/// <param name="streams">A collection of streams.</param>
/// <param name="leaveOpen"><see langword="true"/> to keep the wrapped streams alive when combined stream disposed; otherwise, <see langword="false"/>.</param>
/// <returns>An object that represents multiple streams as one logical stream.</returns>
/// <exception cref="ArgumentException"><paramref name="streams"/> is empty.</exception>
public static Stream Combine(this IEnumerable<Stream> streams)
public static Stream Combine(this IEnumerable<Stream> streams, bool leaveOpen = true)
{
// Use buffer to allocate streams on the stack
var buffer = new StreamBuffer();
Expand All @@ -67,7 +72,7 @@ public static Stream Combine(this IEnumerable<Stream> streams)
try
{
writer.AddAll(streams);
result = Combine(writer.WrittenSpan);
result = Combine(writer.WrittenSpan, leaveOpen);
}
finally
{
Expand Down
3 changes: 2 additions & 1 deletion src/DotNext/Disposable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ public static async ValueTask DisposeAsync(IEnumerable<IAsyncDisposable?> object
/// Disposes many objects in safe manner.
/// </summary>
/// <param name="objects">An array of objects to dispose.</param>
public static void Dispose(ReadOnlySpan<IDisposable?> objects)
public static void Dispose<T>(ReadOnlySpan<T> objects)
where T : IDisposable?
{
foreach (var obj in objects)
obj?.Dispose();
Expand Down

0 comments on commit 817e0e0

Please sign in to comment.