Skip to content

Commit

Permalink
Merge pull request #36 from santisq/35-invoke-parallel-not-correctly-…
Browse files Browse the repository at this point in the history
…disposing-runspaces

Fixes `Invoke-Parallel` not disposing Runspaces
  • Loading branch information
santisq authored Jun 26, 2024
2 parents 8fe77ca + 8b3cfb4 commit 3bff059
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 61 deletions.
2 changes: 1 addition & 1 deletion module/PSParallelPipeline.psd1
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
RootModule = 'bin/netstandard2.0/PSParallelPipeline.dll'

# Version number of this module.
ModuleVersion = '1.1.7'
ModuleVersion = '1.1.8'

# Supported PSEditions
# CompatiblePSEditions = @()
Expand Down
4 changes: 0 additions & 4 deletions src/PSParallelPipeline/ExceptionHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ internal static void WriteTimeoutError(this Exception exception, PSCmdlet cmdlet
ErrorCategory.OperationTimeout,
cmdlet));

// internal static void WriteEndProcessingError(this Exception exception, PSCmdlet cmdlet) =>
// cmdlet.WriteError(new ErrorRecord(
// exception, "EndProcessingOutput", ErrorCategory.NotSpecified, cmdlet));

internal static void WriteUnspecifiedError(this Exception exception, PSCmdlet cmdlet) =>
cmdlet.WriteError(new ErrorRecord(
exception, "UnspecifiedCmdletError", ErrorCategory.NotSpecified, cmdlet));
Expand Down
22 changes: 12 additions & 10 deletions src/PSParallelPipeline/PSTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,34 @@ internal Runspace Runspace

private readonly PowerShell _powershell;

private readonly PSDataStreams _streams;
private readonly PSDataStreams _internalStreams;

private readonly RunspacePool _pool;

private PSTask(RunspacePool runspacePool)
{
_powershell = PowerShell.Create();
_streams = _powershell.Streams;
_internalStreams = _powershell.Streams;
_pool = runspacePool;
}

static internal PSTask Create(RunspacePool runspacePool)
{
PSTask ps = new(runspacePool);
HookStreams(ps, runspacePool.PSOutputStreams);
HookStreams(ps._internalStreams, runspacePool.PSOutputStreams);
return ps;
}

private static void HookStreams(PSTask ps, PSOutputStreams outputStreams)
private static void HookStreams(
PSDataStreams streams,
PSOutputStreams outputStreams)
{
ps._streams.Error = outputStreams.Error;
ps._streams.Debug = outputStreams.Debug;
ps._streams.Information = outputStreams.Information;
ps._streams.Progress = outputStreams.Progress;
ps._streams.Verbose = outputStreams.Verbose;
ps._streams.Warning = outputStreams.Warning;
streams.Error = outputStreams.Error;
streams.Debug = outputStreams.Debug;
streams.Information = outputStreams.Information;
streams.Progress = outputStreams.Progress;
streams.Verbose = outputStreams.Verbose;
streams.Warning = outputStreams.Warning;
}

private static Task InvokePowerShellAsync(
Expand Down
19 changes: 13 additions & 6 deletions src/PSParallelPipeline/RunspacePool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ namespace PSParallelPipeline;

internal sealed class RunspacePool : IDisposable
{
internal PSOutputStreams PSOutputStreams { get => _woker.OutputStreams; }
internal PSOutputStreams PSOutputStreams { get => _worker.OutputStreams; }

private CancellationToken Token { get => _woker.Token; }
private CancellationToken Token { get => _worker.Token; }

private InitialSessionState InitialSessionState { get => _settings.InitialSessionState; }

Expand All @@ -28,20 +28,24 @@ internal sealed class RunspacePool : IDisposable

private readonly PoolSettings _settings;

private readonly Worker _woker;
private readonly Worker _worker;

private readonly List<Runspace> _createdRunspaces;

internal RunspacePool(PoolSettings settings, Worker worker)
{
_settings = settings;
_woker = worker;
_worker = worker;
_pool = new Queue<Runspace>(MaxRunspaces);
_tasks = new List<Task<PSTask>>(MaxRunspaces);
_createdRunspaces = new List<Runspace>(MaxRunspaces);
}

private Runspace CreateRunspace()
{
Runspace rs = RunspaceFactory.CreateRunspace(InitialSessionState);
rs.Open();
_createdRunspaces.Add(rs);
return rs;
}

Expand Down Expand Up @@ -122,9 +126,12 @@ internal CancellationTokenRegistration RegisterCancellation(Action callback) =>

public void Dispose()
{
foreach (Runspace runspace in _pool)
foreach (Runspace runspace in _createdRunspaces)
{
runspace.Dispose();
if (runspace is { RunspaceAvailability: RunspaceAvailability.Available })
{
runspace.Dispose();
}
}

GC.SuppressFinalize(this);
Expand Down
102 changes: 62 additions & 40 deletions tests/PSParallelPipeline.tests.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,20 @@ Describe PSParallelPipeline {

Context 'TimeoutSeconds Parameter' {
It 'Stops processing after the specified seconds' {
$timer = [Stopwatch]::StartNew()
{ 0..5 | Invoke-Parallel { Start-Sleep 10 } -TimeoutSeconds 2 -ErrorAction Stop } |
Should -Throw -ExceptionType ([TimeoutException])
$timer.Stop()
$timer.Elapsed | Should -BeLessOrEqual ([timespan]::FromSeconds(2.1))
Assert-RunspaceCount {
$timer = [Stopwatch]::StartNew()
{
$invokeParallelSplat = @{
ThrottleLimit = 100
TimeOutSeconds = 2
ErrorAction = 'Stop'
ScriptBlock = { Start-Sleep 10 }
}
1..100 | Invoke-Parallel @invokeParallelSplat
} | Should -Throw -ExceptionType ([TimeoutException])
$timer.Stop()
$timer.Elapsed | Should -BeLessOrEqual ([timespan]::FromSeconds(2.2))
}
}
}

Expand All @@ -153,6 +162,9 @@ Describe PSParallelPipeline {
Complete 'Invoke-Parallel -Functions Compl' |
ForEach-Object ListItemText |
Should -Contain 'Complete'

Complete 'Invoke-Parallel -Functions NotExist' |
Should -BeNullOrEmpty
}
}

Expand Down Expand Up @@ -186,12 +198,12 @@ Describe PSParallelPipeline {

Context 'Script Block Assertions' {
It 'Should throw on passed-in Script Block via $using: scope modifier' {
{ $sb = { }; 1..1 | Invoke-Parallel { $using:sb } } |
{ $sb = { 1 + 1 }; 1..1 | Invoke-Parallel { & $using:sb } } |
Should -Throw -ExceptionType ([PSArgumentException])
}

It 'Should throw on passed-in Script Block via -Variables parameter' {
{ $sb = { }; 1..1 | Invoke-Parallel { $sb } -Variables @{ sb = $sb } } |
{ $sb = { 1 + 1 }; 1..1 | Invoke-Parallel { & $sb } -Variables @{ sb = $sb } } |
Should -Throw -ExceptionType ([PSArgumentException])
}

Expand All @@ -202,6 +214,21 @@ Describe PSParallelPipeline {
}

Context 'Invoke-Parallel' {
BeforeAll {
$testOne = {
1..100 | Invoke-Parallel {
0..10 | ForEach-Object { Start-Sleep 1; $_ }
} -ThrottleLimit 100 | Select-Object -First 5
}

$testTwo = {
1..100 | Invoke-Parallel { Start-Sleep 1; $_ } -ThrottleLimit 100 |
Select-Object -First 10
}

$testOne, $testTwo | Out-Null
}

It 'Process in parallel' {
$timer = [Stopwatch]::StartNew()
1..5 | Invoke-Parallel { Start-Sleep 1 }
Expand All @@ -210,23 +237,15 @@ Describe PSParallelPipeline {
}

It 'Supports streaming output' {
Measure-Command {
0..10 | Invoke-Parallel {
0..10 | ForEach-Object {
Start-Sleep 1
$_
}
} | Select-Object -First 5 |
Should -HaveCount 5
} | ForEach-Object TotalSeconds |
Should -BeLessThan 2

Measure-Command {
0..10 | Invoke-Parallel { Start-Sleep 1; $_ } -ThrottleLimit 2 |
Select-Object -First 10 |
Should -HaveCount 10
} | ForEach-Object TotalSeconds |
Should -BeLessThan 6
Assert-RunspaceCount {
Measure-Command { & $testOne | Should -HaveCount 5 } |
ForEach-Object TotalSeconds |
Should -BeLessThan 2

Measure-Command { & $testTwo | Should -HaveCount 10 } |
ForEach-Object TotalSeconds |
Should -BeLessThan 6
}
}

It 'Can add items to a single thread instance' {
Expand All @@ -239,25 +258,28 @@ Describe PSParallelPipeline {
}

It 'Stops processing on CTRL+C' {
try {
$timer = [Stopwatch]::StartNew()
$ps = [powershell]::Create([RunspaceMode]::NewRunspace).
AddScript('0..10 | Invoke-Parallel { Start-Sleep 1; $_ }')
$async = $ps.BeginInvoke()
Assert-RunspaceCount {
try {
$rs = [runspacefactory]::CreateRunspace()
$rs.Open()
$ps = [powershell]::Create().
AddScript('1..100 | Invoke-Parallel { Start-Sleep 1; $_ } -ThrottleLimit 100')
$ps.Runspace = $rs
$timer = [Stopwatch]::StartNew()
$null = $ps.BeginInvoke()
$ps.Stop()

while ($ps.InvocationStateInfo.State -eq [PSInvocationState]::Running) {
Start-Sleep -Milliseconds 50
}

if ($IsCoreCLR) {
$async = $ps.BeginStop($ps.EndStop, $null)
$timer.Stop()
$timer.Elapsed | Should -BeLessOrEqual ([timespan]::FromSeconds(1))
}
else {
$ps.Stop()
finally {
$ps.Dispose()
$rs.Dispose()
}

while (-not $async.AsyncWaitHandle.WaitOne(200)) { }
$timer.Stop()
$timer.Elapsed | Should -BeLessOrEqual ([timespan]::FromSeconds(1))
}
finally {
$ps.Dispose()
}
}
}
Expand Down
17 changes: 17 additions & 0 deletions tests/common.psm1
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,20 @@ function Complete {
$null).CompletionMatches
}
}

function Assert-RunspaceCount {
[CmdletBinding()]
param(
[Parameter(Mandatory)]
[scriptblock] $ScriptBlock
)

try {
$count = @(Get-Runspace).Count
& $ScriptBlock
}
finally {
Get-Runspace |
Should -HaveCount $count -Because 'Runspaces should be correctly disposed'
}
}

0 comments on commit 3bff059

Please sign in to comment.