Skip to content

Commit

Permalink
Always use non-threadpool threads for blocking process IO.
Browse files Browse the repository at this point in the history
fix #94
  • Loading branch information
madelson committed Mar 6, 2023
1 parent c58acf1 commit b0c1041
Show file tree
Hide file tree
Showing 18 changed files with 624 additions and 420 deletions.
2 changes: 1 addition & 1 deletion MedallionShell.Tests/GeneralTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ public async Task TestDefaultProcessStreamIsUsedWithCustomInputEncodingOnNewerFr
.GetValue(command.StandardInput);
#if NETCOREAPP
command.Process.StandardInput.Encoding.ShouldEqual(command.StandardInput.Encoding);
if (command.StandardInput.BaseStream is CompatibilityStandardInputWrapperStream)
if (command.StandardInput.BaseStream is ProcessStreamWrapper)
{
Assert.AreNotSame(command.Process.StandardInput, innerWriter);
}
Expand Down
2 changes: 1 addition & 1 deletion MedallionShell.Tests/PackageSetUpTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void TestLibrariesUseConfigureAwaitFalse()
Assert.IsNotEmpty(codeFiles);

var awaitRegex = new Regex(@"//.*|(?<await>\bawait\s)");
var configureAwaitRegex = new Regex(@"\.ConfigureAwait\(false\)|\.TryAwait\(\)");
var configureAwaitRegex = new Regex(@"\.ConfigureAwait\(false\)");
foreach (var codeFile in codeFiles)
{
var code = File.ReadAllText(codeFile);
Expand Down
68 changes: 68 additions & 0 deletions MedallionShell.Tests/ThreadUsageTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net.NetworkInformation;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;

namespace Medallion.Shell.Tests;

[NonParallelizable] // performs global ThreadPool configuration
internal class ThreadUsageTest
{
/// <summary>
/// Tests the fix to https://github.com/madelson/MedallionShell/issues/94; prior to this change this test
/// would fail for small-ish thread pool values (both 2 and 8 on my machine, for example).
/// </summary>
[Test]
public void TestPipeline([Values(2, 8)] int minThreads)
{
const int ProcessCount = 10;

ThreadPool.GetMinThreads(out var originalMinWorkerThreads, out var originalMinCompletionPortThreads);
ThreadPool.GetMaxThreads(out var originalMaxWorkerThreads, out var originalMaxCompletionPortThreads);
Command? pipeline = null;
try
{
ThreadPool.SetMinThreads(minThreads, minThreads);
ThreadPool.SetMaxThreads(minThreads, minThreads);

var task = Task.Factory.StartNew(
() =>
{
pipeline = Enumerable.Range(0, ProcessCount)
.Select(_ => UnitTestHelpers.TestShell.Run(UnitTestHelpers.SampleCommand, "pipebytes"))
.Aggregate((first, second) => first | second);
for (var i = 0; i < 10; ++i)
{
var @char = (char)('a' + i);
pipeline.StandardInput.AutoFlush.ShouldEqual(true);
var writeTask = pipeline.StandardInput.WriteAsync(@char);
writeTask.Wait(TimeSpan.FromSeconds(30)).ShouldEqual(true, $"write {i} should complete");
var buffer = new char[10];
var readTask = pipeline.StandardOutput.ReadAsync(buffer, 0, buffer.Length);
readTask.Wait(TimeSpan.FromSeconds(30)).ShouldEqual(true, $"read {i} should complete");
readTask.Result.ShouldEqual(1);
buffer[0].ShouldEqual(@char);
}
pipeline.StandardInput.Dispose();
pipeline.Task.Wait(TimeSpan.FromSeconds(30)).ShouldEqual(true, "pipeline should exit");
},
TaskCreationOptions.LongRunning
);
Assert.IsTrue(task.Wait(TimeSpan.FromSeconds(10)));
}
finally
{
ThreadPool.SetMinThreads(originalMinWorkerThreads, originalMinCompletionPortThreads);
ThreadPool.SetMaxThreads(originalMaxWorkerThreads, originalMaxCompletionPortThreads);
pipeline?.Kill();
}
}
}
22 changes: 5 additions & 17 deletions MedallionShell/PipedCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,33 +42,21 @@ public override Process Process
private IReadOnlyList<int>? processIds;
public override IReadOnlyList<int> ProcessIds => this.processIds ??= this.first.ProcessIds.Concat(this.second.ProcessIds).ToList().AsReadOnly();

public override Task<CommandResult> Task
{
get { return this.task; }
}
public override Task<CommandResult> Task => this.task;

public override ProcessStreamWriter StandardInput
{
get { return this.first.StandardInput; }
}
public override ProcessStreamWriter StandardInput => this.first.StandardInput;

public override Streams.ProcessStreamReader StandardOutput
{
get { return this.second.StandardOutput; }
}
public override ProcessStreamReader StandardOutput => this.second.StandardOutput;

public override Streams.ProcessStreamReader StandardError
{
get { return this.second.StandardError; }
}
public override ProcessStreamReader StandardError => this.second.StandardError;

public override void Kill()
{
this.first.Kill();
this.second.Kill();
}

public override string ToString() => this.first + " | " + this.second;
public override string ToString() => $"{this.first} | {this.second}";

protected override void DisposeInternal()
{
Expand Down
15 changes: 9 additions & 6 deletions MedallionShell/PlatformCompatibilityHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,21 @@ internal static class PlatformCompatibilityHelper
// see http://www.mono-project.com/docs/faq/technical/
public static readonly bool IsMono = Type.GetType("Mono.Runtime") != null;

public static bool IsWindows => RuntimeInformation.IsOSPlatform(OSPlatform.Windows);
public static readonly bool IsWindows = RuntimeInformation.IsOSPlatform(OSPlatform.Windows);

/// <summary>
/// See https://github.com/dotnet/runtime/issues/81896 and
/// https://github.com/madelson/MedallionShell/issues/94
/// </summary>
public static bool ProcessStreamsUseSyncIO => IsWindows;

public static bool ProcessStreamWriteThrowsOnProcessEnd => !IsWindows || IsMono;

public static readonly CommandLineSyntax DefaultCommandLineSyntax =
IsMono && !IsWindows
? new MonoUnixCommandLineSyntax()
: new WindowsCommandLineSyntax();

public static Stream WrapStandardInputStreamIfNeeded(Stream stream)
{
return IsMono || !IsWindows ? new CompatibilityStandardInputWrapperStream(stream) : stream;
}

public static int SafeGetExitCode(this Process process)
{
if (IsMono)
Expand Down
10 changes: 1 addition & 9 deletions MedallionShell/ProcessCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,7 @@ internal ProcessCommand(
}
if (processStandardInput != null)
{
// Unfortunately, changing the encoding on older frameworks can't be done via ProcessStartInfo so
// we have to do it manually here. See https://github.com/dotnet/corefx/issues/20497

var wrappedStream = PlatformCompatibilityHelper.WrapStandardInputStreamIfNeeded(processStandardInput.BaseStream);
var standardInputEncodingToUse = standardInputEncoding ?? processStandardInput.Encoding;
var streamWriter = wrappedStream == processStandardInput.BaseStream && Equals(standardInputEncodingToUse, processStandardInput.Encoding)
? processStandardInput
: new StreamWriter(wrappedStream, standardInputEncodingToUse);
this.standardInput = new ProcessStreamWriter(streamWriter);
this.standardInput = new ProcessStreamWriter(processStandardInput, standardInputEncoding ?? processStandardInput.Encoding);
}

// according to https://docs.microsoft.com/en-us/dotnet/api/system.diagnostics.process.id?view=netcore-1.1#System_Diagnostics_Process_Id,
Expand Down
36 changes: 36 additions & 0 deletions MedallionShell/Shims/AsyncLocalShim.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// note: we use the shim implementation for our NETFRAMEWORK tests in DEBUG just to get coverage
#if NET45 || (DEBUG && NETFRAMEWORK)
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Runtime.Remoting.Messaging;
using System.Text;
using System.Threading.Tasks;

namespace Medallion.Shell;

internal sealed class AsyncLocal<T> where T : class? // needed for ConditionalWeakTable impl; ok for our purposes
{
// CallContext values must be serializable; this indirection should eliminate any chance of that burning us
private static readonly ConditionalWeakTable<object, T> Storage = new();

private readonly string Key = $"AsyncLocal<{typeof(T)}>_{Guid.NewGuid()}";

public T? Value
{
get => CallContext.LogicalGetData(Key) is { } storageKey
? Storage.TryGetValue(storageKey, out var value)
? value
: throw new KeyNotFoundException()
: default;
set
{
object? storageKey;
if (value is null) { storageKey = null; }
else { Storage.Add(storageKey = new(), value); }
CallContext.LogicalSetData(Key, storageKey);
}
}
}
#endif
44 changes: 44 additions & 0 deletions MedallionShell/Shims/Shims.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

namespace Medallion.Shell;

internal static class Shims
{
public static T[] EmptyArray<T>() =>
#if NET45 || (NETFRAMEWORK && DEBUG) // for test coverage
Empty<T>.Array;

private static class Empty<T>
{
public static readonly T[] Array = new T[0];
}
#else
Array.Empty<T>();
#endif

public static Task<T> FaultedTask<T>(Exception exception)
{
#if NET45 || (NETFRAMEWORK && DEBUG) // for test coverage
TaskCompletionSource<T> taskCompletionSource = new();
taskCompletionSource.SetException(exception);
return taskCompletionSource.Task;
#else
return Task.FromException<T>(exception);
#endif
}

public static Task<T> CanceledTask<T>(CancellationToken cancellationToken)
{
#if NET45 || (NETFRAMEWORK && DEBUG) // for test coverage
TaskCompletionSource<T> taskCompletionSource = new();
taskCompletionSource.SetCanceled();
return taskCompletionSource.Task;
#else
return Task.FromCanceled<T>(cancellationToken);
#endif
}
}
26 changes: 5 additions & 21 deletions MedallionShell/Shims.cs → MedallionShell/Shims/SpanShims.cs
Original file line number Diff line number Diff line change
@@ -1,31 +1,15 @@
using System;
#if !NETCOREAPP2_1_OR_GREATER && !NETSTANDARD2_1
#pragma warning disable SA1649 // File name should match first type name

using System.Diagnostics;
using System.Runtime.CompilerServices;

namespace Medallion.Shell;
namespace System;

internal static class Shims
internal static class MemoryExtensions
{
public static T[] EmptyArray<T>() =>
#if !NET45
Array.Empty<T>();
#else
Empty<T>.Array;

private static class Empty<T>
{
public static readonly T[] Array = new T[0];
}
#endif

#if !NETCOREAPP2_1_OR_GREATER && !NETSTANDARD2_1
public static Span<T> AsSpan<T>(this T[] array, int start, int length) => new(new(array, start, length));
#endif
}

#pragma warning disable SA1306 // Field names should begin with lower-case letter

#if !NETCOREAPP2_1_OR_GREATER && !NETSTANDARD2_1
internal readonly struct Memory<T>
{
public readonly T[] Array;
Expand Down
19 changes: 19 additions & 0 deletions MedallionShell/Shims/ValueTupleShims.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#if !NETCOREAPP && !NET47_OR_GREATER && !NETSTANDARD2_0_OR_GREATER
#pragma warning disable SA1649 // File name should match first type name

namespace System;

internal struct ValueTuple<T1, T2, T3>
{
public T1 Item1;
public T2 Item2;
public T3 Item3;

public ValueTuple(T1 item1, T2 item2, T3 item3)
{
this.Item1 = item1;
this.Item2 = item2;
this.Item3 = item3;
}
}
#endif
Loading

0 comments on commit b0c1041

Please sign in to comment.