Skip to content

Commit

Permalink
Move stop callback notification on success when both reading and runn…
Browse files Browse the repository at this point in the history
…ing loops exited

Signed-off-by: Ilya <[email protected]>
  • Loading branch information
rihter007 committed Nov 25, 2021
1 parent 6dd21f7 commit 1606015
Showing 1 changed file with 11 additions and 8 deletions.
19 changes: 11 additions & 8 deletions pkg/runner/step_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type StepRunner struct {
addedTargets map[string]OnTargetResult
runningLoopActive bool

stepStopped chan struct{}
stopped chan struct{}
resultErr error
resultResumeState json.RawMessage
}
Expand All @@ -46,7 +46,7 @@ func (sr *StepRunner) AddTarget(tgt *target.Target, callback OnTargetResult) err
}

if sr.stepIn == nil {
return fmt.Errorf("step runner is stepStopped")
return fmt.Errorf("step runner is stopped")
}

existingCb := sr.addedTargets[tgt.ID]
Expand Down Expand Up @@ -75,14 +75,14 @@ func (sr *StepRunner) IsRunning() bool {
sr.mu.Lock()
defer sr.mu.Unlock()

return sr.stepStopped != nil
return sr.stopped != nil
}

func (sr *StepRunner) WaitResults(ctx context.Context) (json.RawMessage, error) {
sr.mu.Lock()
resultErr := sr.resultErr
resultResumeState := sr.resultResumeState
stepStopped := sr.stepStopped
stepStopped := sr.stopped
sr.mu.Unlock()

if resultErr != nil {
Expand Down Expand Up @@ -176,6 +176,7 @@ func (sr *StepRunner) runningLoop(
if recoverOccurred := safeCloseOutCh(stepOut); recoverOccurred {
sr.setErr(&cerrors.ErrTestStepClosedChannels{StepName: bundle.TestStepLabel})
}
sr.ctx.Debugf("output channel closed")
}()

defer func() {
Expand All @@ -196,7 +197,6 @@ func (sr *StepRunner) runningLoop(
sr.mu.Lock()
sr.setErrLocked(err)
sr.resultResumeState = resultResumeState
sr.notifyStoppedLocked(err)
sr.mu.Unlock()
}

Expand Down Expand Up @@ -261,7 +261,7 @@ func NewStepRunner(
stepIn: stepIn,
addedTargets: make(map[string]OnTargetResult),
runningLoopActive: true,
stepStopped: make(chan struct{}),
stopped: make(chan struct{}),
stopCallback: stoppedCallback,
}

Expand All @@ -273,8 +273,11 @@ func NewStepRunner(
sr.mu.Lock()
defer sr.mu.Unlock()

close(sr.stepStopped)
sr.stepStopped = nil
close(sr.stopped)
sr.stopped = nil

// if an error occurred, this callback was invoked early
sr.notifyStoppedLocked(nil)
}

go func() {
Expand Down

0 comments on commit 1606015

Please sign in to comment.