Skip to content

Commit

Permalink
going back to what was working...
Browse files Browse the repository at this point in the history
  • Loading branch information
santisq committed Jul 8, 2024
1 parent 3d2582a commit 9d9a39f
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 19 deletions.
8 changes: 4 additions & 4 deletions src/PSParallelPipeline/Commands/InvokeParallelCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ protected override void ProcessRecord()
catch (Exception _) when (_ is PipelineStoppedException or FlowControlException)
{
_worker.Cancel();
_worker.Wait();
_worker.WaitOnCancel();
throw;
}
catch (OperationCanceledException exception)
{
_worker.Wait();
_worker.WaitOnCancel();

Check warning on line 99 in src/PSParallelPipeline/Commands/InvokeParallelCommand.cs

View check run for this annotation

Codecov / codecov/patch

src/PSParallelPipeline/Commands/InvokeParallelCommand.cs#L99

Added line #L99 was not covered by tests
exception.WriteTimeoutError(this);
}
catch (Exception exception)
Expand All @@ -121,12 +121,12 @@ protected override void EndProcessing()
catch (Exception _) when (_ is PipelineStoppedException or FlowControlException)
{
_worker.Cancel();
_worker.Wait();
_worker.WaitOnCancel();
throw;
}
catch (OperationCanceledException exception)
{
_worker.Wait();
_worker.WaitOnCancel();
exception.WriteTimeoutError(this);
}
catch (Exception exception)
Expand Down
1 change: 1 addition & 0 deletions src/PSParallelPipeline/RunspacePool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ private async Task<Runspace> GetRunspaceAsync()
{
return runspace;
}

return CreateRunspace();
}

Expand Down
28 changes: 13 additions & 15 deletions src/PSParallelPipeline/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ internal Worker(PoolSettings settings)

internal void Wait() => _worker?.Wait();

internal void WaitOnCancel() => _worker?
.ContinueWith(
_ => { _runspacePool.WaitOnCancel(); },
TaskContinuationOptions.NotOnRanToCompletion)
.Wait();

internal void Cancel() => _cts.Cancel();

internal void CancelAfter(TimeSpan span) => _cts.CancelAfter(span);
Expand Down Expand Up @@ -64,32 +70,24 @@ internal IEnumerable<PSOutputData> GetConsumingEnumerable() =>

private async Task Start()
{
try
while (!_inputQueue.IsCompleted)
{
while (!_inputQueue.IsCompleted)
if (_inputQueue.TryTake(out PSTask ps, 0, Token))
{
if (_inputQueue.TryTake(out PSTask ps, 0, Token))
{
await _runspacePool.EnqueueAsync(ps);
}
await _runspacePool.EnqueueAsync(ps);
}

await _runspacePool.ProcessAllAsync();
OutputPipe.CompleteAdding();
}
catch (OperationCanceledException)
{
_runspacePool.WaitOnCancel();
_runspacePool.Dispose();
}

await _runspacePool.ProcessAllAsync();
OutputPipe.CompleteAdding();
}

public void Dispose()
{
_inputQueue.Dispose();
OutputStreams.Dispose();
_runspacePool.Dispose();
_cts.Dispose();
_runspacePool.Dispose();
GC.SuppressFinalize(this);
}
}

0 comments on commit 9d9a39f

Please sign in to comment.