From 240db9ec1f10a9a9c4b3fe34bc536720d91e4454 Mon Sep 17 00:00:00 2001 From: Ilya Date: Wed, 24 Nov 2021 17:33:21 +0000 Subject: [PATCH 01/20] Implement StepRunner Signed-off-by: Ilya --- pkg/cerrors/cerrors.go | 10 ++ pkg/runner/step_runner.go | 271 ++++++++++++++++++++++++++++++++++++++ pkg/runner/test_runner.go | 2 + 3 files changed, 283 insertions(+) create mode 100644 pkg/runner/step_runner.go diff --git a/pkg/cerrors/cerrors.go b/pkg/cerrors/cerrors.go index 12077432..5d3f36e5 100644 --- a/pkg/cerrors/cerrors.go +++ b/pkg/cerrors/cerrors.go @@ -44,6 +44,16 @@ func (e *ErrTestStepPaniced) Error() string { return fmt.Sprintf("test step %s paniced, trace: %q", e.StepName, e.StackTrace) } +// ErrTestStepReturnedNoTarget indicates that a test step returned nil Target +type ErrTestStepReturnedNoTarget struct { + StepName string +} + +// Error returns the error string associated with the error +func (e *ErrTestStepReturnedNoTarget) Error() string { + return fmt.Sprintf("test step %s returned nil result", e.StepName) +} + // ErrTestStepReturnedDuplicateResult indicates that a test step returned result // twice for the same target. type ErrTestStepReturnedDuplicateResult struct { diff --git a/pkg/runner/step_runner.go b/pkg/runner/step_runner.go new file mode 100644 index 00000000..1362a1bc --- /dev/null +++ b/pkg/runner/step_runner.go @@ -0,0 +1,271 @@ +package runner + +import ( + "context" + "encoding/json" + "fmt" + "runtime/debug" + "sync" + + "github.com/linuxboot/contest/pkg/cerrors" + "github.com/linuxboot/contest/pkg/event/testevent" + "github.com/linuxboot/contest/pkg/target" + "github.com/linuxboot/contest/pkg/test" + "github.com/linuxboot/contest/pkg/xcontext" +) + +type OnTargetResult func(res error) +type OnStepRunnerStopped func(err error) + +type StepRunner struct { + ctx xcontext.Context + cancel context.CancelFunc + mu sync.Mutex + stopCallback OnStepRunnerStopped + + stepIn chan *target.Target + addedTargets map[string]OnTargetResult + + stepStopped chan struct{} + resultErr error + resultResumeState json.RawMessage +} + +func (sr *StepRunner) AddTarget(tgt *target.Target, callback OnTargetResult) error { + if callback == nil { + return fmt.Errorf("callback should not be nil") + } + err := func(tgt *target.Target, callback OnTargetResult) error { + sr.mu.Lock() + defer sr.mu.Unlock() + + if sr.ctx.Err() != nil { + return sr.ctx.Err() + } + + if sr.stepIn == nil { + return fmt.Errorf("step runner is stepStopped") + } + + existingCb := sr.addedTargets[tgt.ID] + if existingCb != nil { + return fmt.Errorf("existing target") + } + sr.addedTargets[tgt.ID] = callback + return nil + }(tgt, callback) + if err != nil { + return err + } + + // check if running + select { + case sr.stepIn <- tgt: + case <-sr.ctx.Until(xcontext.ErrPaused): + return xcontext.ErrPaused + case <-sr.ctx.Done(): + return sr.ctx.Err() + } + return nil +} + +func (sr *StepRunner) IsRunning() bool { + sr.mu.Lock() + defer sr.mu.Unlock() + + return sr.stepStopped != nil +} + +func (sr *StepRunner) WaitResults(ctx context.Context) (json.RawMessage, error) { + sr.mu.Lock() + resultErr := sr.resultErr + resultResumeState := sr.resultResumeState + stepStopped := sr.stepStopped + sr.mu.Unlock() + + if resultErr != nil { + return resultResumeState, resultErr + } + + if stepStopped != nil { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-stepStopped: + } + } + + sr.mu.Lock() + defer sr.mu.Unlock() + return sr.resultResumeState, sr.resultErr +} + +// Stop triggers TestStep to stop running by closing input channel +func (sr *StepRunner) Stop() { + sr.mu.Lock() + defer sr.mu.Unlock() + + if sr.stepIn != nil { + sr.ctx.Debugf("Close input channel") + close(sr.stepIn) + sr.stepIn = nil + } +} + +func (sr *StepRunner) readingLoop(stepOut chan test.TestStepResult, testStepLabel string) { + invokePanicSafe := func(callback OnTargetResult, res error) { + if r := recover(); r != nil { + sr.ctx.Errorf("Callback panic, stack: %s", debug.Stack()) + } + callback(res) + } + + for { + select { + case res, ok := <-stepOut: + if !ok { + sr.ctx.Debugf("Output channel closed") + + if sr.IsRunning() { + // This means that plugin closed its channels before leaving. + sr.setErr(&cerrors.ErrTestStepClosedChannels{StepName: testStepLabel}) + } + return + } + + if res.Target == nil { + sr.setErr(&cerrors.ErrTestStepReturnedNoTarget{StepName: testStepLabel}) + return + } + + sr.mu.Lock() + callback, found := sr.addedTargets[res.Target.ID] + sr.addedTargets[res.Target.ID] = nil + sr.mu.Unlock() + + if !found { + sr.setErr(&cerrors.ErrTestStepReturnedUnexpectedResult{StepName: testStepLabel, Target: res.Target.ID}) + return + } + if callback == nil { + sr.setErr(&cerrors.ErrTestStepReturnedDuplicateResult{StepName: testStepLabel, Target: res.Target.ID}) + return + } + invokePanicSafe(callback, res.Err) + + case <- sr.ctx.Done(): + sr.ctx.Debugf("canceled readingLoop") + return + } + } +} + +func (sr *StepRunner) runningLoop( + stepIn chan *target.Target, stepOut chan test.TestStepResult, + bundle test.TestStepBundle, ev testevent.Emitter, resumeState json.RawMessage, +) { + defer func() { + if recoverOccurred := safeCloseOutCh(stepOut); recoverOccurred { + sr.setErr(&cerrors.ErrTestStepClosedChannels{StepName: bundle.TestStepLabel}) + } + }() + + defer func() { + if r := recover(); r != nil { + sr.mu.Lock() + sr.setErrLocked(&cerrors.ErrTestStepPaniced{ + StepName: bundle.TestStepLabel, + StackTrace: fmt.Sprintf("%s / %s", r, debug.Stack()), + }) + close(sr.stepStopped) + sr.stepStopped = nil + sr.mu.Unlock() + } + }() + + inChannels := test.TestStepChannels{In: stepIn, Out: stepOut} + resultResumeState, err := bundle.TestStep.Run(sr.ctx, inChannels, bundle.Parameters, ev, resumeState) + sr.ctx.Debugf("%s: step runner finished %v, rs %s", bundle.TestStepLabel, err, string(resultResumeState)) + + sr.mu.Lock() + sr.setErrLocked(err) + sr.resultResumeState = resultResumeState + close(sr.stepStopped) + sr.stepStopped = nil + sr.notifyStoppedLocked(nil) + sr.mu.Unlock() +} + +// setErr sets step runner error unless already set. +func (sr *StepRunner) setErr(err error) { + sr.mu.Lock() + defer sr.mu.Unlock() + sr.setErrLocked(err) +} + +func (sr *StepRunner) setErrLocked(err error) { + if err == nil || sr.resultErr != nil { + return + } + sr.ctx.Errorf("err: %v", err) + sr.resultErr = err + sr.notifyStoppedLocked(sr.resultErr) +} + +func (sr *StepRunner) notifyStoppedLocked(err error) { + sr.cancel() + if sr.stopCallback == nil { + return + } + stopCallback := sr.stopCallback + sr.stopCallback = nil + + go func() { + if r := recover(); r != nil { + sr.ctx.Errorf("Stop callback panic, stack: %s", debug.Stack()) + } + stopCallback(err) + }() +} + +func safeCloseOutCh(ch chan test.TestStepResult) (recoverOccurred bool) { + recoverOccurred = false + defer func() { + if r := recover(); r != nil { + recoverOccurred = true + } + }() + close(ch) + return +} + +// NewStepRunner creates a new StepRunner object +func NewStepRunner( + ctx xcontext.Context, + bundle test.TestStepBundle, + ev testevent.Emitter, + resumeState json.RawMessage, + stoppedCallback OnStepRunnerStopped, +) *StepRunner { + stepIn := make(chan *target.Target) + stepOut := make(chan test.TestStepResult) + + srCrx, cancel := xcontext.WithCancel(ctx) + sr := &StepRunner{ + ctx: srCrx, + cancel: cancel, + stepIn: stepIn, + addedTargets: make(map[string]OnTargetResult), + stepStopped: make(chan struct{}), + stopCallback: stoppedCallback, + } + + go func() { + sr.runningLoop(stepIn, stepOut, bundle, ev, resumeState) + }() + + go func() { + sr.readingLoop(stepOut, bundle.TestStepLabel) + }() + return sr +} diff --git a/pkg/runner/test_runner.go b/pkg/runner/test_runner.go index 943d5aa1..681cdaa2 100644 --- a/pkg/runner/test_runner.go +++ b/pkg/runner/test_runner.go @@ -59,6 +59,8 @@ type stepState struct { stepIndex int // Index of this step in the pipeline. sb test.TestStepBundle // The test bundle. + stepRunner *StepRunner + // Channels used to communicate with the plugin. inCh chan *target.Target outCh chan test.TestStepResult From b451ac5994d0f1a9f41dd22fecdb7052d762107b Mon Sep 17 00:00:00 2001 From: Ilya Date: Thu, 25 Nov 2021 16:25:42 +0000 Subject: [PATCH 02/20] Use StepRunner in TestRunner Signed-off-by: Ilya --- pkg/runner/test_runner.go | 264 +++++++++++--------------------------- 1 file changed, 73 insertions(+), 191 deletions(-) diff --git a/pkg/runner/test_runner.go b/pkg/runner/test_runner.go index 681cdaa2..5971814c 100644 --- a/pkg/runner/test_runner.go +++ b/pkg/runner/test_runner.go @@ -6,9 +6,9 @@ package runner import ( + "context" "encoding/json" "fmt" - "runtime/debug" "sync" "time" @@ -59,20 +59,11 @@ type stepState struct { stepIndex int // Index of this step in the pipeline. sb test.TestStepBundle // The test bundle. + ev testevent.Emitter stepRunner *StepRunner - // Channels used to communicate with the plugin. - inCh chan *target.Target - outCh chan test.TestStepResult - ev testevent.Emitter - - tgtDone map[*target.Target]bool // Targets for which results have been received. - - resumeState json.RawMessage // Resume state passed to and returned by the Run method. - stepStarted bool // testStep.Run() has been invoked - stepRunning bool // testStep.Run() is currently running. - readerRunning bool // Result reader is running. - runErr error // Runner error, returned from Run() or an error condition detected by the reader. + resumeState json.RawMessage // Resume state passed to and returned by the Run method. + runErr error // Runner error, returned from Run() or an error condition detected by the reader. } // targetStepPhase denotes progression of a target through a step @@ -156,10 +147,7 @@ func (tr *TestRunner) Run( cancel: stepCancel, stepIndex: i, sb: sb, - inCh: make(chan *target.Target), - outCh: make(chan test.TestStepResult), ev: emitterFactory.New(sb.TestStepLabel), - tgtDone: make(map[*target.Target]bool), resumeState: srs, }) // Step handlers will be started from target handlers as targets reach them. @@ -248,7 +236,8 @@ func (tr *TestRunner) Run( ctx.Debugf("- %d in flight, ok to resume? %t", numInFlightTargets, resumeOk) ctx.Debugf("step states:") for i, ss := range tr.steps { - ctx.Debugf(" %d %s %t %t %v %s", i, ss, ss.stepRunning, ss.readerRunning, ss.runErr, ss.resumeState) + ctx.Debugf(" %d %s %t %t %v %s", + i, ss, ss.stepRunner != nil, ss.stepRunner != nil && ss.stepRunner.IsRunning(), ss.runErr, ss.resumeState) } // Is there a useful error to report? @@ -292,69 +281,36 @@ func (tr *TestRunner) Run( func (tr *TestRunner) waitStepRunners(ctx xcontext.Context) error { ctx.Debugf("waiting for step runners to finish") - swch := make(chan struct{}) - go func() { - tr.mu.Lock() - defer tr.mu.Unlock() - for { - ok := true - for _, ss := range tr.steps { - // numRunning == 1 is also acceptable: we allow the Run() goroutine - // to continue in case of error, if the result processor decided - // to abandon its runner, there's nothing we can do. - switch { - case !ss.stepRunning && !ss.readerRunning: - // Done - case ss.stepRunning && ss.readerRunning: - // Still active - ok = false - case !ss.stepRunning && ss.readerRunning: - // Transient state, let it finish - ok = false - case ss.stepRunning && !ss.readerRunning: - // This is possible if plugin got stuck and result processor gave up on it. - // If so, it should have left an error. - if ss.runErr == nil { - ctx.Errorf("%s: result processor left runner with no error", ss) - // There's nothing we can do at this point, fall through. - } - } - } - if ok { - close(swch) - return - } - tr.cond.Wait() - } - }() - var err error - select { - case <-swch: - ctx.Debugf("step runners finished") - tr.mu.Lock() - defer tr.mu.Unlock() - err = tr.checkStepRunnersLocked() - case <-time.After(tr.shutdownTimeout): - ctx.Errorf("step runners failed to shut down correctly") + + shutdownCtx, cancel := context.WithTimeout(ctx, tr.shutdownTimeout) + defer cancel() + + var neverReturnedErr *cerrors.ErrTestStepsNeverReturned + for _, ss := range tr.steps { tr.mu.Lock() - defer tr.mu.Unlock() - // If there is a step with an error set, use that. - err = tr.checkStepRunnersLocked() - // If there isn't, enumerate ones that were still running at the time. - nrerr := &cerrors.ErrTestStepsNeverReturned{} - if err == nil { - err = nrerr + stepRunner := ss.stepRunner + tr.mu.Unlock() + + if stepRunner == nil { + continue } - for _, ss := range tr.steps { - if ss.stepRunning { - ss.setErrLocked(&cerrors.ErrTestStepsNeverReturned{StepNames: []string{ss.sb.TestStepLabel}}) - nrerr.StepNames = append(nrerr.StepNames, ss.sb.TestStepLabel) - // Cancel this step's context, this will help release the reader. - ss.cancel() + resumeState, err := stepRunner.WaitResults(shutdownCtx) + if err == context.DeadlineExceeded { + err = &cerrors.ErrTestStepsNeverReturned{StepNames: []string{ss.sb.TestStepLabel}} + if neverReturnedErr == nil { + neverReturnedErr = &cerrors.ErrTestStepsNeverReturned{} } + neverReturnedErr.StepNames = append(neverReturnedErr.StepNames, ss.sb.TestStepLabel) + // Cancel this step's context, this will help release the reader. + ss.cancel() } + tr.mu.Lock() + ss.resumeState = resumeState + ss.setErrLocked(err) + tr.mu.Unlock() } - // Emit step error events. + + tr.mu.Lock() for _, ss := range tr.steps { if ss.runErr != nil && ss.runErr != xcontext.ErrPaused && ss.runErr != xcontext.ErrCanceled { if err := ss.emitEvent(ctx, EventTestError, nil, ss.runErr.Error()); err != nil { @@ -362,29 +318,40 @@ func (tr *TestRunner) waitStepRunners(ctx xcontext.Context) error { } } } - return err + tr.mu.Unlock() + + resultErr := tr.checkStepRunnersLocked() + if neverReturnedErr != nil && resultErr == nil { + resultErr = neverReturnedErr + } + return resultErr } func (tr *TestRunner) injectTarget(ctx xcontext.Context, tgs *targetState, ss *stepState) error { - var err error ctx.Debugf("%s: injecting into %s", tgs, ss) - select { - case ss.inCh <- tgs.tgt: - // Injected successfully. - err = ss.emitEvent(ctx, target.EventTargetIn, tgs.tgt, nil) - tr.mu.Lock() - defer tr.mu.Unlock() - // By the time we get here the target could have been processed and result posted already, hence the check. - if tgs.CurPhase == targetStepPhaseBegin { - tgs.CurPhase = targetStepPhaseRun + + tgt := tgs.tgt + err := ss.stepRunner.AddTarget(tgt, func(res error) { + if err := tr.reportTargetResult(ss, tgt, res); err != nil { + ctx.Errorf("Reporting target result failed: %v", err) + tr.mu.Lock() + defer tr.mu.Unlock() + ss.setErrLocked(err) } - if err != nil { + tr.cond.Signal() + }) + if err == nil { + if err = ss.emitEvent(ctx, target.EventTargetIn, tgs.tgt, nil); err != nil { err = fmt.Errorf("failed to report target injection: %w", err) } - case <-ctx.Until(xcontext.ErrPaused): - err = xcontext.ErrPaused - case <-ctx.Done(): - err = xcontext.ErrCanceled + func() { + tr.mu.Lock() + defer tr.mu.Unlock() + // By the time we get here the target could have been processed and result posted already, hence the check. + if tgs.CurPhase == targetStepPhaseBegin { + tgs.CurPhase = targetStepPhaseRun + } + }() } tr.cond.Signal() return err @@ -528,14 +495,18 @@ loop: func (tr *TestRunner) runStepIfNeeded(ss *stepState) { tr.mu.Lock() defer tr.mu.Unlock() - if ss.stepStarted { + + if ss.stepRunner != nil { return } - ss.stepStarted = true - ss.stepRunning = true - ss.readerRunning = true - go tr.stepRunner(ss) - go tr.stepReader(ss) + ctx := xcontext.WithValue(ss.ctx, "step_index", ss.stepIndex) + ctx = xcontext.WithValue(ctx, "step_label", ss.sb.TestStepLabel) + ss.stepRunner = NewStepRunner(ctx, ss.sb, ss.ev, ss.resumeState, func(err error) { + tr.mu.Lock() + defer tr.mu.Unlock() + ss.setErrLocked(err) + tr.cond.Signal() + }) } func (ss *stepState) setErr(mu sync.Locker, err error) { @@ -572,40 +543,6 @@ func (ss *stepState) emitEvent(ctx xcontext.Context, name event.Name, tgt *targe return ss.ev.Emit(ctx, errEv) } -// stepRunner runs a test pipeline's step (the Run() method). -func (tr *TestRunner) stepRunner(ss *stepState) { - ss.ctx.Debugf("%s: step runner active", ss) - defer func() { - if r := recover(); r != nil { - tr.mu.Lock() - ss.stepRunning = false - ss.setErrLocked(&cerrors.ErrTestStepPaniced{ - StepName: ss.sb.TestStepLabel, - StackTrace: fmt.Sprintf("%s / %s", r, debug.Stack()), - }) - tr.mu.Unlock() - tr.safeCloseOutCh(ss) - } - }() - tr.mu.Lock() - var runErr error - resumeState := ss.resumeState - ss.resumeState = nil - tr.mu.Unlock() - chans := test.TestStepChannels{In: ss.inCh, Out: ss.outCh} - resumeState, runErr = ss.sb.TestStep.Run(ss.ctx, chans, ss.sb.Parameters, ss.ev, resumeState) - ss.ctx.Debugf("%s: step runner finished %v, rs %s", ss, runErr, string(resumeState)) - tr.mu.Lock() - ss.stepRunning = false - ss.setErrLocked(runErr) - if runErr == xcontext.ErrPaused { - ss.resumeState = resumeState - } - tr.mu.Unlock() - // Signal to the result processor that no more will be coming. - tr.safeCloseOutCh(ss) -} - // reportTargetResult reports result of executing a step to the appropriate target handler. func (tr *TestRunner) reportTargetResult(ss *stepState, tgt *target.Target, res error) error { resCh, err := func() (chan error, error) { @@ -618,13 +555,6 @@ func (tr *TestRunner) reportTargetResult(ss *stepState, tgt *target.Target, res Target: tgt.ID, } } - if ss.tgtDone[tgt] { - return nil, &cerrors.ErrTestStepReturnedDuplicateResult{ - StepName: ss.sb.TestStepLabel, - Target: tgt.ID, - } - } - ss.tgtDone[tgt] = true // Begin is also allowed here because it may happen that we get a result before target handler updates phase. if tgs.CurStep != ss.stepIndex || (tgs.CurPhase != targetStepPhaseBegin && tgs.CurPhase != targetStepPhaseRun) { @@ -659,56 +589,6 @@ func (tr *TestRunner) reportTargetResult(ss *stepState, tgt *target.Target, res return nil } -func (tr *TestRunner) safeCloseOutCh(ss *stepState) { - defer func() { - if r := recover(); r != nil { - ss.setErr(&tr.mu, &cerrors.ErrTestStepClosedChannels{StepName: ss.sb.TestStepLabel}) - } - }() - close(ss.outCh) -} - -// stepReader receives results from the step's output channel and forwards them to the appropriate target handlers. -func (tr *TestRunner) stepReader(ss *stepState) { - ss.ctx.Debugf("%s: step reader active", ss) - var err error - cancelCh := ss.ctx.Done() - var shutdownTimeoutCh <-chan time.Time -loop: - for { - select { - case res, ok := <-ss.outCh: - if !ok { - ss.ctx.Debugf("%s: out chan closed", ss) - tr.mu.Lock() - if ss.stepRunning { - // This means that plugin closed its channels before leaving. - err = &cerrors.ErrTestStepClosedChannels{StepName: ss.sb.TestStepLabel} - } - tr.mu.Unlock() - break loop - } - if err = tr.reportTargetResult(ss, res.Target, res.Err); err != nil { - break loop - } - case <-cancelCh: - ss.ctx.Debugf("%s: canceled 3, draining", ss) - // Allow some time to drain - cancelCh = nil - shutdownTimeoutCh = time.After(tr.shutdownTimeout) - case <-shutdownTimeoutCh: - err = &cerrors.ErrTestStepsNeverReturned{StepNames: []string{ss.sb.TestStepLabel}} - break loop - } - } - tr.mu.Lock() - defer tr.mu.Unlock() - ss.setErrLocked(err) - ss.readerRunning = false - ss.ctx.Debugf("%s: step reader finished, %t %t %v", ss, ss.stepRunning, ss.readerRunning, ss.runErr) - tr.cond.Signal() -} - // checkStepRunnersLocked checks if any step runner has encountered an error. func (tr *TestRunner) checkStepRunnersLocked() error { for i, ss := range tr.steps { @@ -774,7 +654,9 @@ stepLoop: } // All targets ok, close the step's input channel. ctx.Debugf("monitor pass %d: %s: no more targets, closing input channel", pass, ss) - close(ss.inCh) + if ss.stepRunner != nil { + ss.stepRunner.Stop() + } step++ } // Wait for all the targets to finish. @@ -795,7 +677,7 @@ tgtLoop: // It's been paused, this is fine. continue } - if ss.stepStarted && !ss.readerRunning { + if ss.stepRunner != nil && !ss.stepRunner.IsRunning() { // Target has been injected but step runner has exited without a valid reason, this target has been lost. runErr = &cerrors.ErrTestStepLostTargets{ StepName: ss.sb.TestStepLabel, @@ -820,7 +702,7 @@ tgtLoop: func NewTestRunnerWithTimeouts(shutdownTimeout time.Duration) *TestRunner { tr := &TestRunner{ - shutdownTimeout: shutdownTimeout, + shutdownTimeout: shutdownTimeout, } tr.cond = sync.NewCond(&tr.mu) return tr From 377a27005b93a61e2cd889d4f0670f12ce08add8 Mon Sep 17 00:00:00 2001 From: Ilya Date: Thu, 25 Nov 2021 16:30:01 +0000 Subject: [PATCH 03/20] Remove unused stepState.setErr function Signed-off-by: Ilya --- pkg/runner/test_runner.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pkg/runner/test_runner.go b/pkg/runner/test_runner.go index 5971814c..144786de 100644 --- a/pkg/runner/test_runner.go +++ b/pkg/runner/test_runner.go @@ -509,12 +509,6 @@ func (tr *TestRunner) runStepIfNeeded(ss *stepState) { }) } -func (ss *stepState) setErr(mu sync.Locker, err error) { - mu.Lock() - defer mu.Unlock() - ss.setErrLocked(err) -} - // setErrLocked sets step runner error unless already set. func (ss *stepState) setErrLocked(err error) { if err == nil || ss.runErr != nil { From ebbd6647cc130b202a4fadcfefa86da164c63d82 Mon Sep 17 00:00:00 2001 From: Ilya Date: Thu, 25 Nov 2021 17:50:41 +0000 Subject: [PATCH 04/20] Treat StepRunner stopped when both reading and running loops exited Signed-off-by: Ilya --- pkg/runner/step_runner.go | 63 +++++++++++++++++++++++++-------------- 1 file changed, 41 insertions(+), 22 deletions(-) diff --git a/pkg/runner/step_runner.go b/pkg/runner/step_runner.go index 1362a1bc..6b007b05 100644 --- a/pkg/runner/step_runner.go +++ b/pkg/runner/step_runner.go @@ -6,6 +6,7 @@ import ( "fmt" "runtime/debug" "sync" + "sync/atomic" "github.com/linuxboot/contest/pkg/cerrors" "github.com/linuxboot/contest/pkg/event/testevent" @@ -18,16 +19,17 @@ type OnTargetResult func(res error) type OnStepRunnerStopped func(err error) type StepRunner struct { - ctx xcontext.Context - cancel context.CancelFunc - mu sync.Mutex + ctx xcontext.Context + cancel context.CancelFunc + mu sync.Mutex stopCallback OnStepRunnerStopped - stepIn chan *target.Target - addedTargets map[string]OnTargetResult + stepIn chan *target.Target + addedTargets map[string]OnTargetResult + runningLoopActive bool - stepStopped chan struct{} - resultErr error + stepStopped chan struct{} + resultErr error resultResumeState json.RawMessage } @@ -126,10 +128,12 @@ func (sr *StepRunner) readingLoop(stepOut chan test.TestStepResult, testStepLabe if !ok { sr.ctx.Debugf("Output channel closed") - if sr.IsRunning() { + sr.mu.Lock() + if sr.runningLoopActive { // This means that plugin closed its channels before leaving. - sr.setErr(&cerrors.ErrTestStepClosedChannels{StepName: testStepLabel}) + sr.setErrLocked(&cerrors.ErrTestStepClosedChannels{StepName: testStepLabel}) } + sr.mu.Unlock() return } @@ -153,7 +157,7 @@ func (sr *StepRunner) readingLoop(stepOut chan test.TestStepResult, testStepLabe } invokePanicSafe(callback, res.Err) - case <- sr.ctx.Done(): + case <-sr.ctx.Done(): sr.ctx.Debugf("canceled readingLoop") return } @@ -165,6 +169,10 @@ func (sr *StepRunner) runningLoop( bundle test.TestStepBundle, ev testevent.Emitter, resumeState json.RawMessage, ) { defer func() { + sr.mu.Lock() + sr.runningLoopActive = false + sr.mu.Unlock() + if recoverOccurred := safeCloseOutCh(stepOut); recoverOccurred { sr.setErr(&cerrors.ErrTestStepClosedChannels{StepName: bundle.TestStepLabel}) } @@ -177,22 +185,18 @@ func (sr *StepRunner) runningLoop( StepName: bundle.TestStepLabel, StackTrace: fmt.Sprintf("%s / %s", r, debug.Stack()), }) - close(sr.stepStopped) - sr.stepStopped = nil sr.mu.Unlock() } }() inChannels := test.TestStepChannels{In: stepIn, Out: stepOut} resultResumeState, err := bundle.TestStep.Run(sr.ctx, inChannels, bundle.Parameters, ev, resumeState) - sr.ctx.Debugf("%s: step runner finished %v, rs %s", bundle.TestStepLabel, err, string(resultResumeState)) + sr.ctx.Debugf("Step runner finished '%v', rs %s", err, string(resultResumeState)) sr.mu.Lock() sr.setErrLocked(err) sr.resultResumeState = resultResumeState - close(sr.stepStopped) - sr.stepStopped = nil - sr.notifyStoppedLocked(nil) + sr.notifyStoppedLocked(err) sr.mu.Unlock() } @@ -252,19 +256,34 @@ func NewStepRunner( srCrx, cancel := xcontext.WithCancel(ctx) sr := &StepRunner{ - ctx: srCrx, - cancel: cancel, - stepIn: stepIn, - addedTargets: make(map[string]OnTargetResult), - stepStopped: make(chan struct{}), - stopCallback: stoppedCallback, + ctx: srCrx, + cancel: cancel, + stepIn: stepIn, + addedTargets: make(map[string]OnTargetResult), + runningLoopActive: true, + stepStopped: make(chan struct{}), + stopCallback: stoppedCallback, + } + + var activeLoopsCount int32 = 2 + onFinished := func() { + if atomic.AddInt32(&activeLoopsCount, -1) != 0 { + return + } + sr.mu.Lock() + defer sr.mu.Unlock() + + close(sr.stepStopped) + sr.stepStopped = nil } go func() { + defer onFinished() sr.runningLoop(stepIn, stepOut, bundle, ev, resumeState) }() go func() { + defer onFinished() sr.readingLoop(stepOut, bundle.TestStepLabel) }() return sr From bc6829d2e4210136cf75a21cb9edb6836d762bac Mon Sep 17 00:00:00 2001 From: Ilya Date: Thu, 25 Nov 2021 18:13:17 +0000 Subject: [PATCH 05/20] Move stop callback notification on success when both reading and running loops exited Signed-off-by: Ilya --- pkg/runner/step_runner.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/pkg/runner/step_runner.go b/pkg/runner/step_runner.go index 6b007b05..60d91d33 100644 --- a/pkg/runner/step_runner.go +++ b/pkg/runner/step_runner.go @@ -28,7 +28,7 @@ type StepRunner struct { addedTargets map[string]OnTargetResult runningLoopActive bool - stepStopped chan struct{} + stopped chan struct{} resultErr error resultResumeState json.RawMessage } @@ -46,7 +46,7 @@ func (sr *StepRunner) AddTarget(tgt *target.Target, callback OnTargetResult) err } if sr.stepIn == nil { - return fmt.Errorf("step runner is stepStopped") + return fmt.Errorf("step runner is stopped") } existingCb := sr.addedTargets[tgt.ID] @@ -75,14 +75,14 @@ func (sr *StepRunner) IsRunning() bool { sr.mu.Lock() defer sr.mu.Unlock() - return sr.stepStopped != nil + return sr.stopped != nil } func (sr *StepRunner) WaitResults(ctx context.Context) (json.RawMessage, error) { sr.mu.Lock() resultErr := sr.resultErr resultResumeState := sr.resultResumeState - stepStopped := sr.stepStopped + stepStopped := sr.stopped sr.mu.Unlock() if resultErr != nil { @@ -176,6 +176,7 @@ func (sr *StepRunner) runningLoop( if recoverOccurred := safeCloseOutCh(stepOut); recoverOccurred { sr.setErr(&cerrors.ErrTestStepClosedChannels{StepName: bundle.TestStepLabel}) } + sr.ctx.Debugf("output channel closed") }() defer func() { @@ -196,7 +197,6 @@ func (sr *StepRunner) runningLoop( sr.mu.Lock() sr.setErrLocked(err) sr.resultResumeState = resultResumeState - sr.notifyStoppedLocked(err) sr.mu.Unlock() } @@ -261,7 +261,7 @@ func NewStepRunner( stepIn: stepIn, addedTargets: make(map[string]OnTargetResult), runningLoopActive: true, - stepStopped: make(chan struct{}), + stopped: make(chan struct{}), stopCallback: stoppedCallback, } @@ -273,8 +273,11 @@ func NewStepRunner( sr.mu.Lock() defer sr.mu.Unlock() - close(sr.stepStopped) - sr.stepStopped = nil + close(sr.stopped) + sr.stopped = nil + + // if an error occurred, this callback was invoked early + sr.notifyStoppedLocked(nil) } go func() { From 9a03b4a251d025273fd18434db981f813610eee4 Mon Sep 17 00:00:00 2001 From: Ilya Date: Thu, 25 Nov 2021 22:41:26 +0000 Subject: [PATCH 06/20] Start StepRunner via explicit Run method Signed-off-by: Ilya --- pkg/runner/step_runner.go | 165 ++++++++++++++++++++------------------ pkg/runner/test_runner.go | 76 +++++++++--------- 2 files changed, 124 insertions(+), 117 deletions(-) diff --git a/pkg/runner/step_runner.go b/pkg/runner/step_runner.go index 60d91d33..266652c5 100644 --- a/pkg/runner/step_runner.go +++ b/pkg/runner/step_runner.go @@ -15,29 +15,28 @@ import ( "github.com/linuxboot/contest/pkg/xcontext" ) -type OnTargetResult func(res error) +type OnTargetResult func(tgt *target.Target, res error) type OnStepRunnerStopped func(err error) type StepRunner struct { - ctx xcontext.Context - cancel context.CancelFunc - mu sync.Mutex - stopCallback OnStepRunnerStopped + ctx xcontext.Context + cancel context.CancelFunc + mu sync.Mutex + targetCallback OnTargetResult + stopCallback OnStepRunnerStopped stepIn chan *target.Target - addedTargets map[string]OnTargetResult + stopOnce sync.Once + reportedTargets map[string]struct{} + started uint32 runningLoopActive bool - - stopped chan struct{} + finished chan struct{} resultErr error resultResumeState json.RawMessage } -func (sr *StepRunner) AddTarget(tgt *target.Target, callback OnTargetResult) error { - if callback == nil { - return fmt.Errorf("callback should not be nil") - } - err := func(tgt *target.Target, callback OnTargetResult) error { +func (sr *StepRunner) AddTarget(tgt *target.Target) error { + err := func(tgt *target.Target) error { sr.mu.Lock() defer sr.mu.Unlock() @@ -48,14 +47,8 @@ func (sr *StepRunner) AddTarget(tgt *target.Target, callback OnTargetResult) err if sr.stepIn == nil { return fmt.Errorf("step runner is stopped") } - - existingCb := sr.addedTargets[tgt.ID] - if existingCb != nil { - return fmt.Errorf("existing target") - } - sr.addedTargets[tgt.ID] = callback return nil - }(tgt, callback) + }(tgt) if err != nil { return err } @@ -71,18 +64,61 @@ func (sr *StepRunner) AddTarget(tgt *target.Target, callback OnTargetResult) err return nil } -func (sr *StepRunner) IsRunning() bool { +func (sr *StepRunner) Run( + bundle test.TestStepBundle, + ev testevent.Emitter, + resumeState json.RawMessage, +) { + if !atomic.CompareAndSwapUint32(&sr.started, 0, 1) { + return + } + + var activeLoopsCount int32 = 2 + onFinished := func() { + if atomic.AddInt32(&activeLoopsCount, -1) != 0 { + return + } + sr.mu.Lock() + defer sr.mu.Unlock() + + close(sr.finished) + sr.finished = nil + + // if an error occurred, this callback was invoked early + sr.notifyStoppedLocked(nil) + sr.ctx.Debugf("StepRunner finished") + } + + stepOut := make(chan test.TestStepResult) + go func() { + defer onFinished() + sr.runningLoop(stepOut, bundle, ev, resumeState) + sr.ctx.Debugf("Running loop finished") + }() + + go func() { + defer onFinished() + sr.readingLoop(stepOut, bundle.TestStepLabel) + sr.ctx.Debugf("Reading loop finished") + }() +} + +func (sr *StepRunner) Started() bool { + return atomic.LoadUint32(&sr.started) == 1 +} + +func (sr *StepRunner) Running() bool { sr.mu.Lock() defer sr.mu.Unlock() - return sr.stopped != nil + return sr.Started() && sr.finished != nil } func (sr *StepRunner) WaitResults(ctx context.Context) (json.RawMessage, error) { sr.mu.Lock() resultErr := sr.resultErr resultResumeState := sr.resultResumeState - stepStopped := sr.stopped + stepStopped := sr.finished sr.mu.Unlock() if resultErr != nil { @@ -104,24 +140,22 @@ func (sr *StepRunner) WaitResults(ctx context.Context) (json.RawMessage, error) // Stop triggers TestStep to stop running by closing input channel func (sr *StepRunner) Stop() { - sr.mu.Lock() - defer sr.mu.Unlock() - - if sr.stepIn != nil { - sr.ctx.Debugf("Close input channel") + sr.stopOnce.Do(func() { close(sr.stepIn) - sr.stepIn = nil - } + sr.ctx.Debugf("Input channel was closed") + }) } -func (sr *StepRunner) readingLoop(stepOut chan test.TestStepResult, testStepLabel string) { - invokePanicSafe := func(callback OnTargetResult, res error) { +func (sr *StepRunner) targetCallbackPanicSafe(tgt *target.Target, res error) { + defer func() { if r := recover(); r != nil { sr.ctx.Errorf("Callback panic, stack: %s", debug.Stack()) } - callback(res) - } + }() + sr.targetCallback(tgt, res) +} +func (sr *StepRunner) readingLoop(stepOut chan test.TestStepResult, testStepLabel string) { for { select { case res, ok := <-stepOut: @@ -141,21 +175,18 @@ func (sr *StepRunner) readingLoop(stepOut chan test.TestStepResult, testStepLabe sr.setErr(&cerrors.ErrTestStepReturnedNoTarget{StepName: testStepLabel}) return } + sr.ctx.Infof("Obtained '%v' for target '%s'", res, res.Target.ID) sr.mu.Lock() - callback, found := sr.addedTargets[res.Target.ID] - sr.addedTargets[res.Target.ID] = nil + _, found := sr.reportedTargets[res.Target.ID] + sr.reportedTargets[res.Target.ID] = struct{}{} sr.mu.Unlock() - if !found { - sr.setErr(&cerrors.ErrTestStepReturnedUnexpectedResult{StepName: testStepLabel, Target: res.Target.ID}) - return - } - if callback == nil { + if found { sr.setErr(&cerrors.ErrTestStepReturnedDuplicateResult{StepName: testStepLabel, Target: res.Target.ID}) return } - invokePanicSafe(callback, res.Err) + sr.targetCallbackPanicSafe(res.Target, res.Err) case <-sr.ctx.Done(): sr.ctx.Debugf("canceled readingLoop") @@ -165,7 +196,7 @@ func (sr *StepRunner) readingLoop(stepOut chan test.TestStepResult, testStepLabe } func (sr *StepRunner) runningLoop( - stepIn chan *target.Target, stepOut chan test.TestStepResult, + stepOut chan test.TestStepResult, bundle test.TestStepBundle, ev testevent.Emitter, resumeState json.RawMessage, ) { defer func() { @@ -190,9 +221,13 @@ func (sr *StepRunner) runningLoop( } }() - inChannels := test.TestStepChannels{In: stepIn, Out: stepOut} + sr.mu.Lock() + sr.runningLoopActive = true + sr.mu.Unlock() + + inChannels := test.TestStepChannels{In: sr.stepIn, Out: stepOut} resultResumeState, err := bundle.TestStep.Run(sr.ctx, inChannels, bundle.Parameters, ev, resumeState) - sr.ctx.Debugf("Step runner finished '%v', rs %s", err, string(resultResumeState)) + sr.ctx.Debugf("TestStep finished '%v', rs %s", err, string(resultResumeState)) sr.mu.Lock() sr.setErrLocked(err) @@ -246,48 +281,24 @@ func safeCloseOutCh(ch chan test.TestStepResult) (recoverOccurred bool) { // NewStepRunner creates a new StepRunner object func NewStepRunner( ctx xcontext.Context, - bundle test.TestStepBundle, - ev testevent.Emitter, - resumeState json.RawMessage, + targetCallback OnTargetResult, stoppedCallback OnStepRunnerStopped, ) *StepRunner { + if targetCallback == nil { + panic("target callback should not be nil") + } stepIn := make(chan *target.Target) - stepOut := make(chan test.TestStepResult) srCrx, cancel := xcontext.WithCancel(ctx) sr := &StepRunner{ ctx: srCrx, cancel: cancel, stepIn: stepIn, - addedTargets: make(map[string]OnTargetResult), - runningLoopActive: true, - stopped: make(chan struct{}), + targetCallback: targetCallback, stopCallback: stoppedCallback, + reportedTargets: make(map[string]struct{}), + runningLoopActive: true, + finished: make(chan struct{}), } - - var activeLoopsCount int32 = 2 - onFinished := func() { - if atomic.AddInt32(&activeLoopsCount, -1) != 0 { - return - } - sr.mu.Lock() - defer sr.mu.Unlock() - - close(sr.stopped) - sr.stopped = nil - - // if an error occurred, this callback was invoked early - sr.notifyStoppedLocked(nil) - } - - go func() { - defer onFinished() - sr.runningLoop(stepIn, stepOut, bundle, ev, resumeState) - }() - - go func() { - defer onFinished() - sr.readingLoop(stepOut, bundle.TestStepLabel) - }() return sr } diff --git a/pkg/runner/test_runner.go b/pkg/runner/test_runner.go index 144786de..4d447dcb 100644 --- a/pkg/runner/test_runner.go +++ b/pkg/runner/test_runner.go @@ -9,6 +9,7 @@ import ( "context" "encoding/json" "fmt" + "strconv" "sync" "time" @@ -138,19 +139,41 @@ func (tr *TestRunner) Run( // Set up the pipeline for i, sb := range t.TestStepsBundles { stepCtx, stepCancel := xcontext.WithCancel(stepsCtx) + stepCtx = stepCtx.WithField("step_index", strconv.Itoa(i)) + stepCtx = stepCtx.WithField("step_label", sb.TestStepLabel) + var srs json.RawMessage if i < len(rs.StepResumeState) && string(rs.StepResumeState[i]) != "null" { srs = rs.StepResumeState[i] } - tr.steps = append(tr.steps, &stepState{ + ss := &stepState{ ctx: stepCtx, cancel: stepCancel, stepIndex: i, sb: sb, ev: emitterFactory.New(sb.TestStepLabel), resumeState: srs, - }) + } // Step handlers will be started from target handlers as targets reach them. + ss.stepRunner = NewStepRunner( + ss.ctx, + func(tgt *target.Target, res error) { + if err := tr.reportTargetResult(ss, tgt, res); err != nil { + ctx.Errorf("Reporting target result failed: %v", err) + tr.mu.Lock() + ss.setErrLocked(err) + tr.mu.Unlock() + } + tr.cond.Signal() + }, + func(err error) { + tr.mu.Lock() + defer tr.mu.Unlock() + ss.setErrLocked(err) + tr.cond.Signal() + }, + ) + tr.steps = append(tr.steps, ss) } // Set up the targets @@ -231,13 +254,13 @@ func (tr *TestRunner) Run( if stepErr != nil && stepErr != xcontext.ErrPaused { resumeOk = false } - ctx.Debugf(" %d %s %v %t", i, tgs, stepErr, resumeOk) + ctx.Debugf(" %d %s %v %t", i, tgs.String(), stepErr, resumeOk) } ctx.Debugf("- %d in flight, ok to resume? %t", numInFlightTargets, resumeOk) ctx.Debugf("step states:") for i, ss := range tr.steps { ctx.Debugf(" %d %s %t %t %v %s", - i, ss, ss.stepRunner != nil, ss.stepRunner != nil && ss.stepRunner.IsRunning(), ss.runErr, ss.resumeState) + i, ss, ss.stepRunner.Started(), ss.stepRunner.Running(), ss.runErr, ss.resumeState) } // Is there a useful error to report? @@ -287,14 +310,10 @@ func (tr *TestRunner) waitStepRunners(ctx xcontext.Context) error { var neverReturnedErr *cerrors.ErrTestStepsNeverReturned for _, ss := range tr.steps { - tr.mu.Lock() - stepRunner := ss.stepRunner - tr.mu.Unlock() - - if stepRunner == nil { + if !ss.stepRunner.Started() { continue } - resumeState, err := stepRunner.WaitResults(shutdownCtx) + resumeState, err := ss.stepRunner.WaitResults(shutdownCtx) if err == context.DeadlineExceeded { err = &cerrors.ErrTestStepsNeverReturned{StepNames: []string{ss.sb.TestStepLabel}} if neverReturnedErr == nil { @@ -331,15 +350,7 @@ func (tr *TestRunner) injectTarget(ctx xcontext.Context, tgs *targetState, ss *s ctx.Debugf("%s: injecting into %s", tgs, ss) tgt := tgs.tgt - err := ss.stepRunner.AddTarget(tgt, func(res error) { - if err := tr.reportTargetResult(ss, tgt, res); err != nil { - ctx.Errorf("Reporting target result failed: %v", err) - tr.mu.Lock() - defer tr.mu.Unlock() - ss.setErrLocked(err) - } - tr.cond.Signal() - }) + err := ss.stepRunner.AddTarget(tgt) if err == nil { if err = ss.emitEvent(ctx, target.EventTargetIn, tgs.tgt, nil); err != nil { err = fmt.Errorf("failed to report target injection: %w", err) @@ -493,20 +504,7 @@ loop: // runStepIfNeeded starts the step runner goroutine if not already running. func (tr *TestRunner) runStepIfNeeded(ss *stepState) { - tr.mu.Lock() - defer tr.mu.Unlock() - - if ss.stepRunner != nil { - return - } - ctx := xcontext.WithValue(ss.ctx, "step_index", ss.stepIndex) - ctx = xcontext.WithValue(ctx, "step_label", ss.sb.TestStepLabel) - ss.stepRunner = NewStepRunner(ctx, ss.sb, ss.ev, ss.resumeState, func(err error) { - tr.mu.Lock() - defer tr.mu.Unlock() - ss.setErrLocked(err) - tr.cond.Signal() - }) + ss.stepRunner.Run(ss.sb, ss.ev, ss.resumeState) } // setErrLocked sets step runner error unless already set. @@ -624,11 +622,11 @@ func (tr *TestRunner) runMonitor(ctx xcontext.Context, minStep int) error { stepLoop: for step := minStep; step < len(tr.steps); pass++ { ss := tr.steps[step] - ctx.Debugf("monitor pass %d: current step %s", pass, ss) + ctx.Debugf("monitor pass %d: current step %s", pass, ss.String()) // Check if all the targets have either made it past the injection phase or terminated. ok := true for _, tgs := range tr.targets { - ctx.Debugf("monitor pass %d: %s: %s", pass, ss, tgs) + ctx.Debugf("monitor pass %d: %s: %s", pass, ss, tgs.String()) if !tgs.handlerRunning { // Not running anymore continue } @@ -648,9 +646,7 @@ stepLoop: } // All targets ok, close the step's input channel. ctx.Debugf("monitor pass %d: %s: no more targets, closing input channel", pass, ss) - if ss.stepRunner != nil { - ss.stepRunner.Stop() - } + ss.stepRunner.Stop() step++ } // Wait for all the targets to finish. @@ -671,7 +667,7 @@ tgtLoop: // It's been paused, this is fine. continue } - if ss.stepRunner != nil && !ss.stepRunner.IsRunning() { + if ss.stepRunner.Started() && !ss.stepRunner.Running() { // Target has been injected but step runner has exited without a valid reason, this target has been lost. runErr = &cerrors.ErrTestStepLostTargets{ StepName: ss.sb.TestStepLabel, @@ -741,5 +737,5 @@ func (tgs *targetState) String() string { } finished := !tgs.handlerRunning return fmt.Sprintf("[%s %d %s %t %s]", - tgs.tgt, tgs.CurStep, tgs.CurPhase, finished, resText) + tgs.tgt, tgs.CurStep, tgs.CurPhase.String(), finished, resText) } From 84ad31f2b1467512e81f90315fad0ebec3b14daf Mon Sep 17 00:00:00 2001 From: Ilya Date: Thu, 25 Nov 2021 23:52:34 +0000 Subject: [PATCH 07/20] Increase successTimeout for integration unit tests Signed-off-by: Ilya --- tests/integ/plugins/testrunner_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integ/plugins/testrunner_test.go b/tests/integ/plugins/testrunner_test.go index df3ddfb1..dcf1d7bc 100644 --- a/tests/integ/plugins/testrunner_test.go +++ b/tests/integ/plugins/testrunner_test.go @@ -46,7 +46,7 @@ var ( var ( pluginRegistry *pluginregistry.PluginRegistry targets []*target.Target - successTimeout = 5 * time.Second + successTimeout = 20 * time.Second ) var testSteps = map[string]test.TestStepFactory{ From c526388136e9e6837d3660dd9dd265940be964a5 Mon Sep 17 00:00:00 2001 From: Ilya Date: Fri, 26 Nov 2021 00:15:43 +0000 Subject: [PATCH 08/20] Fix race condition in awaitTargetResult when both cancel and result ready Signed-off-by: Ilya --- pkg/runner/test_runner.go | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/pkg/runner/test_runner.go b/pkg/runner/test_runner.go index 4d447dcb..d88ba9d2 100644 --- a/pkg/runner/test_runner.go +++ b/pkg/runner/test_runner.go @@ -379,13 +379,7 @@ func (tr *TestRunner) awaitTargetResult(ctx xcontext.Context, tgs *targetState, return xcontext.ErrPaused } - select { - case res, ok := <-resCh: - if !ok { - // Channel is closed when job is paused to make sure all results are processed. - ctx.Debugf("%s: result channel closed", tgs) - return xcontext.ErrPaused - } + processResult := func(res error) error { ctx.Debugf("%s: result recd for %s", tgs, ss) var err error if res == nil { @@ -404,12 +398,29 @@ func (tr *TestRunner) awaitTargetResult(ctx xcontext.Context, tgs *targetState, tr.mu.Unlock() tr.cond.Signal() return err + } + + select { + case res, ok := <-resCh: + if !ok { + // Channel is closed when job is paused to make sure all results are processed. + ctx.Debugf("%s: result channel closed", tgs) + return xcontext.ErrPaused + } + return processResult(res) // Check for cancellation. // Notably we are not checking for the pause condition here: // when paused, we want to let all the injected targets to finish // and collect all the results they produce. If that doesn't happen, // step runner will close resCh on its way out and unblock us. case <-ctx.Done(): + // we might have a race-condition here when both events happen + // in this case we should prioritise result processing + select { + case res := <-resCh: + return processResult(res) + default: + } tr.mu.Lock() ctx.Debugf("%s: canceled 2", tgs) tr.mu.Unlock() From 63859b097ba72726a147c26ce486629f245f0821 Mon Sep 17 00:00:00 2001 From: Ilya Date: Fri, 26 Nov 2021 00:35:19 +0000 Subject: [PATCH 09/20] Fix race condition in accessing StepState.resumeState Signed-off-by: Ilya --- pkg/runner/test_runner.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/runner/test_runner.go b/pkg/runner/test_runner.go index d88ba9d2..2aa28c56 100644 --- a/pkg/runner/test_runner.go +++ b/pkg/runner/test_runner.go @@ -515,7 +515,12 @@ loop: // runStepIfNeeded starts the step runner goroutine if not already running. func (tr *TestRunner) runStepIfNeeded(ss *stepState) { - ss.stepRunner.Run(ss.sb, ss.ev, ss.resumeState) + tr.mu.Lock() + resumeState := ss.resumeState + ss.resumeState = nil + tr.mu.Unlock() + + ss.stepRunner.Run(ss.sb, ss.ev, resumeState) } // setErrLocked sets step runner error unless already set. From 4d1094e398a75963d49a62359da2c157d77a8648 Mon Sep 17 00:00:00 2001 From: Ilya Date: Fri, 26 Nov 2021 12:11:09 +0000 Subject: [PATCH 10/20] Remove unnecessary check StepRunner.AddTarget Signed-off-by: Ilya --- pkg/runner/step_runner.go | 34 +++++++++------------------------- 1 file changed, 9 insertions(+), 25 deletions(-) diff --git a/pkg/runner/step_runner.go b/pkg/runner/step_runner.go index 266652c5..30d5d9de 100644 --- a/pkg/runner/step_runner.go +++ b/pkg/runner/step_runner.go @@ -30,29 +30,13 @@ type StepRunner struct { reportedTargets map[string]struct{} started uint32 runningLoopActive bool - finished chan struct{} + finishedCh chan struct{} + resultErr error resultResumeState json.RawMessage } func (sr *StepRunner) AddTarget(tgt *target.Target) error { - err := func(tgt *target.Target) error { - sr.mu.Lock() - defer sr.mu.Unlock() - - if sr.ctx.Err() != nil { - return sr.ctx.Err() - } - - if sr.stepIn == nil { - return fmt.Errorf("step runner is stopped") - } - return nil - }(tgt) - if err != nil { - return err - } - // check if running select { case sr.stepIn <- tgt: @@ -81,8 +65,8 @@ func (sr *StepRunner) Run( sr.mu.Lock() defer sr.mu.Unlock() - close(sr.finished) - sr.finished = nil + close(sr.finishedCh) + sr.finishedCh = nil // if an error occurred, this callback was invoked early sr.notifyStoppedLocked(nil) @@ -111,25 +95,25 @@ func (sr *StepRunner) Running() bool { sr.mu.Lock() defer sr.mu.Unlock() - return sr.Started() && sr.finished != nil + return sr.Started() && sr.finishedCh != nil } func (sr *StepRunner) WaitResults(ctx context.Context) (json.RawMessage, error) { sr.mu.Lock() resultErr := sr.resultErr resultResumeState := sr.resultResumeState - stepStopped := sr.finished + stepFinished := sr.finishedCh sr.mu.Unlock() if resultErr != nil { return resultResumeState, resultErr } - if stepStopped != nil { + if stepFinished != nil { select { case <-ctx.Done(): return nil, ctx.Err() - case <-stepStopped: + case <-stepFinished: } } @@ -298,7 +282,7 @@ func NewStepRunner( stopCallback: stoppedCallback, reportedTargets: make(map[string]struct{}), runningLoopActive: true, - finished: make(chan struct{}), + finishedCh: make(chan struct{}), } return sr } From feacc0640bb5f0c959d487db16ece2029199ce2f Mon Sep 17 00:00:00 2001 From: Ilya Date: Fri, 26 Nov 2021 17:16:57 +0000 Subject: [PATCH 11/20] Push result to target result channel without ctx.Done() check to avoid race condition. Signed-off-by: Ilya --- pkg/runner/test_runner.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/pkg/runner/test_runner.go b/pkg/runner/test_runner.go index 2aa28c56..7b3dbeea 100644 --- a/pkg/runner/test_runner.go +++ b/pkg/runner/test_runner.go @@ -588,12 +588,9 @@ func (tr *TestRunner) reportTargetResult(ss *stepState, tgt *target.Target, res if err != nil { return err } - select { - case resCh <- res: - break - case <-ss.ctx.Done(): - break - } + + // As it has buffer size 1 and we process a single result for each target at a time + resCh <- res return nil } From adcf8833860f66cadf2f22dfe290c6429f9ed6d2 Mon Sep 17 00:00:00 2001 From: Ilya Date: Fri, 26 Nov 2021 22:08:38 +0000 Subject: [PATCH 12/20] Minor code style improvement Signed-off-by: Ilya --- pkg/runner/step_runner.go | 15 ++++++++------- pkg/runner/test_runner.go | 8 ++++---- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/pkg/runner/step_runner.go b/pkg/runner/step_runner.go index 30d5d9de..9fdb62b3 100644 --- a/pkg/runner/step_runner.go +++ b/pkg/runner/step_runner.go @@ -19,17 +19,19 @@ type OnTargetResult func(tgt *target.Target, res error) type OnStepRunnerStopped func(err error) type StepRunner struct { - ctx xcontext.Context - cancel context.CancelFunc - mu sync.Mutex + ctx xcontext.Context + cancel context.CancelFunc + mu sync.Mutex + targetCallback OnTargetResult stopCallback OnStepRunnerStopped - stepIn chan *target.Target - stopOnce sync.Once - reportedTargets map[string]struct{} + stepIn chan *target.Target + reportedTargets map[string]struct{} + started uint32 runningLoopActive bool + stopOnce sync.Once finishedCh chan struct{} resultErr error @@ -37,7 +39,6 @@ type StepRunner struct { } func (sr *StepRunner) AddTarget(tgt *target.Target) error { - // check if running select { case sr.stepIn <- tgt: case <-sr.ctx.Until(xcontext.ErrPaused): diff --git a/pkg/runner/test_runner.go b/pkg/runner/test_runner.go index 7b3dbeea..c3f4b767 100644 --- a/pkg/runner/test_runner.go +++ b/pkg/runner/test_runner.go @@ -254,7 +254,7 @@ func (tr *TestRunner) Run( if stepErr != nil && stepErr != xcontext.ErrPaused { resumeOk = false } - ctx.Debugf(" %d %s %v %t", i, tgs.String(), stepErr, resumeOk) + ctx.Debugf(" %d %s %v %t", i, tgs, stepErr, resumeOk) } ctx.Debugf("- %d in flight, ok to resume? %t", numInFlightTargets, resumeOk) ctx.Debugf("step states:") @@ -635,11 +635,11 @@ func (tr *TestRunner) runMonitor(ctx xcontext.Context, minStep int) error { stepLoop: for step := minStep; step < len(tr.steps); pass++ { ss := tr.steps[step] - ctx.Debugf("monitor pass %d: current step %s", pass, ss.String()) + ctx.Debugf("monitor pass %d: current step %s", pass, ss) // Check if all the targets have either made it past the injection phase or terminated. ok := true for _, tgs := range tr.targets { - ctx.Debugf("monitor pass %d: %s: %s", pass, ss, tgs.String()) + ctx.Debugf("monitor pass %d: %s: %s", pass, ss, tgs) if !tgs.handlerRunning { // Not running anymore continue } @@ -750,5 +750,5 @@ func (tgs *targetState) String() string { } finished := !tgs.handlerRunning return fmt.Sprintf("[%s %d %s %t %s]", - tgs.tgt, tgs.CurStep, tgs.CurPhase.String(), finished, resText) + tgs.tgt, tgs.CurStep, tgs.CurPhase, finished, resText) } From 23f6645b6f57dd3bb963e4bd3c5f2652256c1e56 Mon Sep 17 00:00:00 2001 From: Ilya Date: Sun, 5 Dec 2021 21:22:01 +0000 Subject: [PATCH 13/20] Improve code-style StepRunner.Run() consumes output callbacks Signed-off-by: Ilya --- pkg/cerrors/cerrors.go | 7 +++ pkg/runner/step_runner.go | 99 ++++++++++++++------------------ pkg/runner/test_runner.go | 117 ++++++++++++++++++++------------------ 3 files changed, 111 insertions(+), 112 deletions(-) diff --git a/pkg/cerrors/cerrors.go b/pkg/cerrors/cerrors.go index 5d3f36e5..eb7d19db 100644 --- a/pkg/cerrors/cerrors.go +++ b/pkg/cerrors/cerrors.go @@ -10,6 +10,13 @@ import ( "strings" ) +type ErrAlreadyDone struct { +} + +func (e *ErrAlreadyDone) Error() string { + return "already done" +} + // ErrTestStepsNeverReturned indicates that one or multiple TestSteps // did not complete when the test terminated or when the pipeline // received a cancellation or pause signal diff --git a/pkg/runner/step_runner.go b/pkg/runner/step_runner.go index 9fdb62b3..c807b2d4 100644 --- a/pkg/runner/step_runner.go +++ b/pkg/runner/step_runner.go @@ -50,16 +50,28 @@ func (sr *StepRunner) AddTarget(tgt *target.Target) error { } func (sr *StepRunner) Run( + ctx xcontext.Context, bundle test.TestStepBundle, ev testevent.Emitter, resumeState json.RawMessage, -) { + targetCallback OnTargetResult, + stopCallback OnStepRunnerStopped, +) error { + if targetCallback == nil { + return fmt.Errorf("target callback should not be nil") + } if !atomic.CompareAndSwapUint32(&sr.started, 0, 1) { - return + return &cerrors.ErrAlreadyDone{} } + srCtx, cancel := xcontext.WithCancel(ctx) + sr.ctx = srCtx + sr.cancel = cancel + sr.targetCallback = targetCallback + sr.stopCallback = stopCallback + var activeLoopsCount int32 = 2 - onFinished := func() { + finish := func() { if atomic.AddInt32(&activeLoopsCount, -1) != 0 { return } @@ -71,21 +83,22 @@ func (sr *StepRunner) Run( // if an error occurred, this callback was invoked early sr.notifyStoppedLocked(nil) - sr.ctx.Debugf("StepRunner finished") + srCtx.Debugf("StepRunner finished") } stepOut := make(chan test.TestStepResult) go func() { - defer onFinished() + defer finish() sr.runningLoop(stepOut, bundle, ev, resumeState) - sr.ctx.Debugf("Running loop finished") + srCtx.Debugf("Running loop finished") }() go func() { - defer onFinished() + defer finish() sr.readingLoop(stepOut, bundle.TestStepLabel) - sr.ctx.Debugf("Reading loop finished") + srCtx.Debugf("Reading loop finished") }() + return nil } func (sr *StepRunner) Started() bool { @@ -103,18 +116,18 @@ func (sr *StepRunner) WaitResults(ctx context.Context) (json.RawMessage, error) sr.mu.Lock() resultErr := sr.resultErr resultResumeState := sr.resultResumeState - stepFinished := sr.finishedCh + finishedCh := sr.finishedCh sr.mu.Unlock() if resultErr != nil { return resultResumeState, resultErr } - if stepFinished != nil { + if finishedCh != nil { select { case <-ctx.Done(): return nil, ctx.Err() - case <-stepFinished: + case <-finishedCh: } } @@ -127,19 +140,9 @@ func (sr *StepRunner) WaitResults(ctx context.Context) (json.RawMessage, error) func (sr *StepRunner) Stop() { sr.stopOnce.Do(func() { close(sr.stepIn) - sr.ctx.Debugf("Input channel was closed") }) } -func (sr *StepRunner) targetCallbackPanicSafe(tgt *target.Target, res error) { - defer func() { - if r := recover(); r != nil { - sr.ctx.Errorf("Callback panic, stack: %s", debug.Stack()) - } - }() - sr.targetCallback(tgt, res) -} - func (sr *StepRunner) readingLoop(stepOut chan test.TestStepResult, testStepLabel string) { for { select { @@ -171,7 +174,7 @@ func (sr *StepRunner) readingLoop(stepOut chan test.TestStepResult, testStepLabe sr.setErr(&cerrors.ErrTestStepReturnedDuplicateResult{StepName: testStepLabel, Target: res.Target.ID}) return } - sr.targetCallbackPanicSafe(res.Target, res.Err) + sr.targetCallback(res.Target, res.Err) case <-sr.ctx.Done(): sr.ctx.Debugf("canceled readingLoop") @@ -195,23 +198,25 @@ func (sr *StepRunner) runningLoop( sr.ctx.Debugf("output channel closed") }() - defer func() { - if r := recover(); r != nil { - sr.mu.Lock() - sr.setErrLocked(&cerrors.ErrTestStepPaniced{ - StepName: bundle.TestStepLabel, - StackTrace: fmt.Sprintf("%s / %s", r, debug.Stack()), - }) - sr.mu.Unlock() - } - }() - sr.mu.Lock() sr.runningLoopActive = true sr.mu.Unlock() - inChannels := test.TestStepChannels{In: sr.stepIn, Out: stepOut} - resultResumeState, err := bundle.TestStep.Run(sr.ctx, inChannels, bundle.Parameters, ev, resumeState) + resultResumeState, err := func() (json.RawMessage, error) { + defer func() { + if r := recover(); r != nil { + sr.mu.Lock() + sr.setErrLocked(&cerrors.ErrTestStepPaniced{ + StepName: bundle.TestStepLabel, + StackTrace: fmt.Sprintf("%s / %s", r, debug.Stack()), + }) + sr.mu.Unlock() + } + }() + + inChannels := test.TestStepChannels{In: sr.stepIn, Out: stepOut} + return bundle.TestStep.Run(sr.ctx, inChannels, bundle.Parameters, ev, resumeState) + }() sr.ctx.Debugf("TestStep finished '%v', rs %s", err, string(resultResumeState)) sr.mu.Lock() @@ -245,9 +250,6 @@ func (sr *StepRunner) notifyStoppedLocked(err error) { sr.stopCallback = nil go func() { - if r := recover(); r != nil { - sr.ctx.Errorf("Stop callback panic, stack: %s", debug.Stack()) - } stopCallback(err) }() } @@ -264,26 +266,11 @@ func safeCloseOutCh(ch chan test.TestStepResult) (recoverOccurred bool) { } // NewStepRunner creates a new StepRunner object -func NewStepRunner( - ctx xcontext.Context, - targetCallback OnTargetResult, - stoppedCallback OnStepRunnerStopped, -) *StepRunner { - if targetCallback == nil { - panic("target callback should not be nil") - } - stepIn := make(chan *target.Target) - - srCrx, cancel := xcontext.WithCancel(ctx) - sr := &StepRunner{ - ctx: srCrx, - cancel: cancel, - stepIn: stepIn, - targetCallback: targetCallback, - stopCallback: stoppedCallback, +func NewStepRunner() *StepRunner { + return &StepRunner{ + stepIn: make(chan *target.Target), reportedTargets: make(map[string]struct{}), runningLoopActive: true, finishedCh: make(chan struct{}), } - return sr } diff --git a/pkg/runner/test_runner.go b/pkg/runner/test_runner.go index c3f4b767..c52a0703 100644 --- a/pkg/runner/test_runner.go +++ b/pkg/runner/test_runner.go @@ -8,6 +8,7 @@ package runner import ( "context" "encoding/json" + "errors" "fmt" "strconv" "sync" @@ -48,8 +49,8 @@ type TestRunner struct { // One mutex to rule them all, used to serialize access to all the state above. // Could probably be split into several if necessary. - mu sync.Mutex - cond *sync.Cond // Used to notify the monitor about changes + mu sync.Mutex + monitorCond *sync.Cond // Used to notify the monitor about changes } // stepState contains state associated with one state of the pipeline: @@ -152,27 +153,10 @@ func (tr *TestRunner) Run( stepIndex: i, sb: sb, ev: emitterFactory.New(sb.TestStepLabel), + stepRunner: NewStepRunner(), resumeState: srs, } // Step handlers will be started from target handlers as targets reach them. - ss.stepRunner = NewStepRunner( - ss.ctx, - func(tgt *target.Target, res error) { - if err := tr.reportTargetResult(ss, tgt, res); err != nil { - ctx.Errorf("Reporting target result failed: %v", err) - tr.mu.Lock() - ss.setErrLocked(err) - tr.mu.Unlock() - } - tr.cond.Signal() - }, - func(err error) { - tr.mu.Lock() - defer tr.mu.Unlock() - ss.setErrLocked(err) - tr.cond.Signal() - }, - ) tr.steps = append(tr.steps, ss) } @@ -308,7 +292,7 @@ func (tr *TestRunner) waitStepRunners(ctx xcontext.Context) error { shutdownCtx, cancel := context.WithTimeout(ctx, tr.shutdownTimeout) defer cancel() - var neverReturnedErr *cerrors.ErrTestStepsNeverReturned + var stepsNeverReturned []string for _, ss := range tr.steps { if !ss.stepRunner.Started() { continue @@ -316,10 +300,7 @@ func (tr *TestRunner) waitStepRunners(ctx xcontext.Context) error { resumeState, err := ss.stepRunner.WaitResults(shutdownCtx) if err == context.DeadlineExceeded { err = &cerrors.ErrTestStepsNeverReturned{StepNames: []string{ss.sb.TestStepLabel}} - if neverReturnedErr == nil { - neverReturnedErr = &cerrors.ErrTestStepsNeverReturned{} - } - neverReturnedErr.StepNames = append(neverReturnedErr.StepNames, ss.sb.TestStepLabel) + stepsNeverReturned = append(stepsNeverReturned, ss.sb.TestStepLabel) // Cancel this step's context, this will help release the reader. ss.cancel() } @@ -329,19 +310,21 @@ func (tr *TestRunner) waitStepRunners(ctx xcontext.Context) error { tr.mu.Unlock() } - tr.mu.Lock() for _, ss := range tr.steps { - if ss.runErr != nil && ss.runErr != xcontext.ErrPaused && ss.runErr != xcontext.ErrCanceled { - if err := ss.emitEvent(ctx, EventTestError, nil, ss.runErr.Error()); err != nil { + tr.mu.Lock() + stepErr := ss.runErr + tr.mu.Unlock() + + if stepErr != nil && stepErr != xcontext.ErrPaused && stepErr != xcontext.ErrCanceled { + if err := ss.emitEvent(ctx, EventTestError, nil, stepErr.Error()); err != nil { ctx.Errorf("failed to emit event: %s", err) } } } - tr.mu.Unlock() resultErr := tr.checkStepRunnersLocked() - if neverReturnedErr != nil && resultErr == nil { - resultErr = neverReturnedErr + if len(stepsNeverReturned) > 0 && resultErr == nil { + resultErr = &cerrors.ErrTestStepsNeverReturned{StepNames: stepsNeverReturned} } return resultErr } @@ -355,16 +338,15 @@ func (tr *TestRunner) injectTarget(ctx xcontext.Context, tgs *targetState, ss *s if err = ss.emitEvent(ctx, target.EventTargetIn, tgs.tgt, nil); err != nil { err = fmt.Errorf("failed to report target injection: %w", err) } - func() { - tr.mu.Lock() - defer tr.mu.Unlock() - // By the time we get here the target could have been processed and result posted already, hence the check. - if tgs.CurPhase == targetStepPhaseBegin { - tgs.CurPhase = targetStepPhaseRun - } - }() + + tr.mu.Lock() + // By the time we get here the target could have been processed and result posted already, hence the check. + if tgs.CurPhase == targetStepPhaseBegin { + tgs.CurPhase = targetStepPhaseRun + } + tr.mu.Unlock() } - tr.cond.Signal() + tr.monitorCond.Signal() return err } @@ -379,7 +361,7 @@ func (tr *TestRunner) awaitTargetResult(ctx xcontext.Context, tgs *targetState, return xcontext.ErrPaused } - processResult := func(res error) error { + processTargetResult := func(res error) error { ctx.Debugf("%s: result recd for %s", tgs, ss) var err error if res == nil { @@ -396,7 +378,7 @@ func (tr *TestRunner) awaitTargetResult(ctx xcontext.Context, tgs *targetState, } tgs.CurPhase = targetStepPhaseEnd tr.mu.Unlock() - tr.cond.Signal() + tr.monitorCond.Signal() return err } @@ -407,7 +389,7 @@ func (tr *TestRunner) awaitTargetResult(ctx xcontext.Context, tgs *targetState, ctx.Debugf("%s: result channel closed", tgs) return xcontext.ErrPaused } - return processResult(res) + return processTargetResult(res) // Check for cancellation. // Notably we are not checking for the pause condition here: // when paused, we want to let all the injected targets to finish @@ -418,7 +400,7 @@ func (tr *TestRunner) awaitTargetResult(ctx xcontext.Context, tgs *targetState, // in this case we should prioritise result processing select { case res := <-resCh: - return processResult(res) + return processTargetResult(res) default: } tr.mu.Lock() @@ -483,12 +465,12 @@ loop: } tr.mu.Lock() if err != nil { - ss.ctx.Errorf("%s", err) + ctx.Errorf("%s", err) switch err { case xcontext.ErrPaused: - ss.ctx.Debugf("%s: paused 1", tgs) + ctx.Debugf("%s: paused 1", tgs) case xcontext.ErrCanceled: - ss.ctx.Debugf("%s: canceled 1", tgs) + ctx.Debugf("%s: canceled 1", tgs) default: ss.setErrLocked(err) } @@ -509,7 +491,7 @@ loop: tr.mu.Lock() ctx.Debugf("%s: target handler finished", tgs) tgs.handlerRunning = false - tr.cond.Signal() + tr.monitorCond.Signal() tr.mu.Unlock() } @@ -520,7 +502,26 @@ func (tr *TestRunner) runStepIfNeeded(ss *stepState) { ss.resumeState = nil tr.mu.Unlock() - ss.stepRunner.Run(ss.sb, ss.ev, resumeState) + err := ss.stepRunner.Run(ss.ctx, ss.sb, ss.ev, resumeState, + func(tgt *target.Target, res error) { + if err := tr.reportTargetResult(ss.ctx, ss, tgt, res); err != nil { + ss.ctx.Errorf("Reporting target result failed: %v", err) + tr.mu.Lock() + ss.setErrLocked(err) + tr.mu.Unlock() + } + tr.monitorCond.Signal() + }, + func(err error) { + tr.mu.Lock() + defer tr.mu.Unlock() + ss.setErrLocked(err) + tr.monitorCond.Signal() + }, + ) + if err != nil && !errors.Is(err, &cerrors.ErrAlreadyDone{}) { + ss.setErrLocked(err) + } } // setErrLocked sets step runner error unless already set. @@ -552,7 +553,7 @@ func (ss *stepState) emitEvent(ctx xcontext.Context, name event.Name, tgt *targe } // reportTargetResult reports result of executing a step to the appropriate target handler. -func (tr *TestRunner) reportTargetResult(ss *stepState, tgt *target.Target, res error) error { +func (tr *TestRunner) reportTargetResult(ctx xcontext.Context, ss *stepState, tgt *target.Target, res error) error { resCh, err := func() (chan error, error) { tr.mu.Lock() defer tr.mu.Unlock() @@ -574,7 +575,7 @@ func (tr *TestRunner) reportTargetResult(ss *stepState, tgt *target.Target, res if tgs.resCh == nil { // If canceled or paused, target handler may have left early. We don't care though. select { - case <-ss.ctx.Done(): + case <-ctx.Done(): return nil, xcontext.ErrCanceled default: // This should not happen, must be an internal error. @@ -582,7 +583,7 @@ func (tr *TestRunner) reportTargetResult(ss *stepState, tgt *target.Target, res } } tgs.CurPhase = targetStepPhaseResultPending - ss.ctx.Debugf("%s: result for %s: %v", ss, tgs, res) + ctx.Debugf("%s: result for %s: %v", ss, tgs, res) return tgs.resCh, nil }() if err != nil { @@ -590,7 +591,11 @@ func (tr *TestRunner) reportTargetResult(ss *stepState, tgt *target.Target, res } // As it has buffer size 1 and we process a single result for each target at a time - resCh <- res + select { + case resCh <- res: + default: + panic("WOW") + } return nil } @@ -654,7 +659,7 @@ stepLoop: } if !ok { // Wait for notification: as progress is being made, we get notified. - tr.cond.Wait() + tr.monitorCond.Wait() continue } // All targets ok, close the step's input channel. @@ -697,7 +702,7 @@ tgtLoop: break } // Wait for notification: as progress is being made, we get notified. - tr.cond.Wait() + tr.monitorCond.Wait() } ctx.Debugf("monitor: finished, %v", runErr) return runErr @@ -707,7 +712,7 @@ func NewTestRunnerWithTimeouts(shutdownTimeout time.Duration) *TestRunner { tr := &TestRunner{ shutdownTimeout: shutdownTimeout, } - tr.cond = sync.NewCond(&tr.mu) + tr.monitorCond = sync.NewCond(&tr.mu) return tr } From c523ce26427bbfce208c9dd5b9fd84f00112141f Mon Sep 17 00:00:00 2001 From: Ilya Date: Sun, 5 Dec 2021 22:41:33 +0000 Subject: [PATCH 14/20] Remove context from StepRunner Signed-off-by: Ilya --- pkg/runner/step_runner.go | 76 ++++++++++++++++++++------------------- pkg/runner/test_runner.go | 8 ++--- 2 files changed, 41 insertions(+), 43 deletions(-) diff --git a/pkg/runner/step_runner.go b/pkg/runner/step_runner.go index c807b2d4..2ecf146d 100644 --- a/pkg/runner/step_runner.go +++ b/pkg/runner/step_runner.go @@ -19,12 +19,7 @@ type OnTargetResult func(tgt *target.Target, res error) type OnStepRunnerStopped func(err error) type StepRunner struct { - ctx xcontext.Context - cancel context.CancelFunc - mu sync.Mutex - - targetCallback OnTargetResult - stopCallback OnStepRunnerStopped + mu sync.Mutex stepIn chan *target.Target reportedTargets map[string]struct{} @@ -36,15 +31,16 @@ type StepRunner struct { resultErr error resultResumeState json.RawMessage + stopCallback OnStepRunnerStopped } -func (sr *StepRunner) AddTarget(tgt *target.Target) error { +func (sr *StepRunner) AddTarget(ctx xcontext.Context, tgt *target.Target) error { select { case sr.stepIn <- tgt: - case <-sr.ctx.Until(xcontext.ErrPaused): + case <-ctx.Until(xcontext.ErrPaused): return xcontext.ErrPaused - case <-sr.ctx.Done(): - return sr.ctx.Err() + case <-ctx.Done(): + return ctx.Err() } return nil } @@ -64,10 +60,7 @@ func (sr *StepRunner) Run( return &cerrors.ErrAlreadyDone{} } - srCtx, cancel := xcontext.WithCancel(ctx) - sr.ctx = srCtx - sr.cancel = cancel - sr.targetCallback = targetCallback + srCtx, _ := xcontext.WithCancel(ctx) sr.stopCallback = stopCallback var activeLoopsCount int32 = 2 @@ -89,13 +82,13 @@ func (sr *StepRunner) Run( stepOut := make(chan test.TestStepResult) go func() { defer finish() - sr.runningLoop(stepOut, bundle, ev, resumeState) + sr.runningLoop(ctx, stepOut, bundle, ev, resumeState) srCtx.Debugf("Running loop finished") }() go func() { defer finish() - sr.readingLoop(stepOut, bundle.TestStepLabel) + sr.readingLoop(ctx, stepOut, bundle.TestStepLabel, targetCallback) srCtx.Debugf("Reading loop finished") }() return nil @@ -143,27 +136,34 @@ func (sr *StepRunner) Stop() { }) } -func (sr *StepRunner) readingLoop(stepOut chan test.TestStepResult, testStepLabel string) { +func (sr *StepRunner) readingLoop( + ctx xcontext.Context, + stepOut chan test.TestStepResult, + testStepLabel string, + targetCallback OnTargetResult, +) { + + cancelCh := ctx.Done() for { select { case res, ok := <-stepOut: if !ok { - sr.ctx.Debugf("Output channel closed") + ctx.Debugf("Output channel closed") sr.mu.Lock() if sr.runningLoopActive { // This means that plugin closed its channels before leaving. - sr.setErrLocked(&cerrors.ErrTestStepClosedChannels{StepName: testStepLabel}) + sr.setErrLocked(ctx, &cerrors.ErrTestStepClosedChannels{StepName: testStepLabel}) } sr.mu.Unlock() return } if res.Target == nil { - sr.setErr(&cerrors.ErrTestStepReturnedNoTarget{StepName: testStepLabel}) + sr.setErr(ctx, &cerrors.ErrTestStepReturnedNoTarget{StepName: testStepLabel}) return } - sr.ctx.Infof("Obtained '%v' for target '%s'", res, res.Target.ID) + ctx.Infof("Obtained '%v' for target '%s'", res, res.Target.ID) sr.mu.Lock() _, found := sr.reportedTargets[res.Target.ID] @@ -171,21 +171,24 @@ func (sr *StepRunner) readingLoop(stepOut chan test.TestStepResult, testStepLabe sr.mu.Unlock() if found { - sr.setErr(&cerrors.ErrTestStepReturnedDuplicateResult{StepName: testStepLabel, Target: res.Target.ID}) + sr.setErr(ctx, &cerrors.ErrTestStepReturnedDuplicateResult{StepName: testStepLabel, Target: res.Target.ID}) return } - sr.targetCallback(res.Target, res.Err) + targetCallback(res.Target, res.Err) - case <-sr.ctx.Done(): - sr.ctx.Debugf("canceled readingLoop") + case <-cancelCh: + ctx.Debugf("reading loop detected context canceled") return } } } func (sr *StepRunner) runningLoop( + ctx xcontext.Context, stepOut chan test.TestStepResult, - bundle test.TestStepBundle, ev testevent.Emitter, resumeState json.RawMessage, + bundle test.TestStepBundle, + ev testevent.Emitter, + resumeState json.RawMessage, ) { defer func() { sr.mu.Lock() @@ -193,9 +196,9 @@ func (sr *StepRunner) runningLoop( sr.mu.Unlock() if recoverOccurred := safeCloseOutCh(stepOut); recoverOccurred { - sr.setErr(&cerrors.ErrTestStepClosedChannels{StepName: bundle.TestStepLabel}) + sr.setErr(ctx, &cerrors.ErrTestStepClosedChannels{StepName: bundle.TestStepLabel}) } - sr.ctx.Debugf("output channel closed") + ctx.Debugf("output channel closed") }() sr.mu.Lock() @@ -206,7 +209,7 @@ func (sr *StepRunner) runningLoop( defer func() { if r := recover(); r != nil { sr.mu.Lock() - sr.setErrLocked(&cerrors.ErrTestStepPaniced{ + sr.setErrLocked(ctx, &cerrors.ErrTestStepPaniced{ StepName: bundle.TestStepLabel, StackTrace: fmt.Sprintf("%s / %s", r, debug.Stack()), }) @@ -215,34 +218,33 @@ func (sr *StepRunner) runningLoop( }() inChannels := test.TestStepChannels{In: sr.stepIn, Out: stepOut} - return bundle.TestStep.Run(sr.ctx, inChannels, bundle.Parameters, ev, resumeState) + return bundle.TestStep.Run(ctx, inChannels, bundle.Parameters, ev, resumeState) }() - sr.ctx.Debugf("TestStep finished '%v', rs %s", err, string(resultResumeState)) + ctx.Debugf("TestStep finished '%v', rs %s", err, string(resultResumeState)) sr.mu.Lock() - sr.setErrLocked(err) + sr.setErrLocked(ctx, err) sr.resultResumeState = resultResumeState sr.mu.Unlock() } // setErr sets step runner error unless already set. -func (sr *StepRunner) setErr(err error) { +func (sr *StepRunner) setErr(ctx xcontext.Context, err error) { sr.mu.Lock() defer sr.mu.Unlock() - sr.setErrLocked(err) + sr.setErrLocked(ctx, err) } -func (sr *StepRunner) setErrLocked(err error) { +func (sr *StepRunner) setErrLocked(ctx xcontext.Context, err error) { if err == nil || sr.resultErr != nil { return } - sr.ctx.Errorf("err: %v", err) + ctx.Errorf("err: %v", err) sr.resultErr = err sr.notifyStoppedLocked(sr.resultErr) } func (sr *StepRunner) notifyStoppedLocked(err error) { - sr.cancel() if sr.stopCallback == nil { return } diff --git a/pkg/runner/test_runner.go b/pkg/runner/test_runner.go index c52a0703..f0aa7b6e 100644 --- a/pkg/runner/test_runner.go +++ b/pkg/runner/test_runner.go @@ -333,7 +333,7 @@ func (tr *TestRunner) injectTarget(ctx xcontext.Context, tgs *targetState, ss *s ctx.Debugf("%s: injecting into %s", tgs, ss) tgt := tgs.tgt - err := ss.stepRunner.AddTarget(tgt) + err := ss.stepRunner.AddTarget(ctx, tgt) if err == nil { if err = ss.emitEvent(ctx, target.EventTargetIn, tgs.tgt, nil); err != nil { err = fmt.Errorf("failed to report target injection: %w", err) @@ -591,11 +591,7 @@ func (tr *TestRunner) reportTargetResult(ctx xcontext.Context, ss *stepState, tg } // As it has buffer size 1 and we process a single result for each target at a time - select { - case resCh <- res: - default: - panic("WOW") - } + resCh <- res return nil } From 462f278ab1c97bd091f273fd978eac61c8d7de20 Mon Sep 17 00:00:00 2001 From: Ilya Date: Sun, 5 Dec 2021 22:43:17 +0000 Subject: [PATCH 15/20] Properly handle an error from StepRunner.Run Signed-off-by: Ilya --- pkg/runner/test_runner.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/runner/test_runner.go b/pkg/runner/test_runner.go index f0aa7b6e..e9cf6360 100644 --- a/pkg/runner/test_runner.go +++ b/pkg/runner/test_runner.go @@ -452,10 +452,9 @@ loop: } tr.mu.Unlock() // Make sure we have a step runner active. If not, start one. - tr.runStepIfNeeded(ss) + err := tr.runStepIfNeeded(ss) // Inject the target. - var err error - if inject { + if err == nil && inject { err = tr.injectTarget(ctx, tgs, ss) } // Await result. It will be communicated to us by the step runner @@ -496,7 +495,7 @@ loop: } // runStepIfNeeded starts the step runner goroutine if not already running. -func (tr *TestRunner) runStepIfNeeded(ss *stepState) { +func (tr *TestRunner) runStepIfNeeded(ss *stepState) error { tr.mu.Lock() resumeState := ss.resumeState ss.resumeState = nil @@ -519,9 +518,10 @@ func (tr *TestRunner) runStepIfNeeded(ss *stepState) { tr.monitorCond.Signal() }, ) - if err != nil && !errors.Is(err, &cerrors.ErrAlreadyDone{}) { - ss.setErrLocked(err) + if errors.Is(err, &cerrors.ErrAlreadyDone{}) { + return nil } + return err } // setErrLocked sets step runner error unless already set. From 759fc76ecd877ccd3f92676b75a5f67ba3b7fc60 Mon Sep 17 00:00:00 2001 From: Ilya Date: Mon, 6 Dec 2021 13:49:52 +0000 Subject: [PATCH 16/20] Add test description for ErrAlreadyDone Signed-off-by: Ilya --- pkg/cerrors/cerrors.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/cerrors/cerrors.go b/pkg/cerrors/cerrors.go index eb7d19db..333a8527 100644 --- a/pkg/cerrors/cerrors.go +++ b/pkg/cerrors/cerrors.go @@ -10,6 +10,7 @@ import ( "strings" ) +// ErrAlreadyDone indicates that action already happened type ErrAlreadyDone struct { } From 566c2a13444e78e6adf5f04e6ba229143f9d15c8 Mon Sep 17 00:00:00 2001 From: Ilya Date: Thu, 9 Dec 2021 23:27:10 +0000 Subject: [PATCH 17/20] Refactor reportTargetResult Clear StepRunner.WaitResult() signature Signed-off-by: Ilya --- pkg/runner/step_runner.go | 46 ++++++++++++++++++----- pkg/runner/test_runner.go | 68 ++++++++++++++++------------------ pkg/runner/test_runner_test.go | 20 +++++----- 3 files changed, 78 insertions(+), 56 deletions(-) diff --git a/pkg/runner/step_runner.go b/pkg/runner/step_runner.go index 2ecf146d..3d19f8d3 100644 --- a/pkg/runner/step_runner.go +++ b/pkg/runner/step_runner.go @@ -18,13 +18,18 @@ import ( type OnTargetResult func(tgt *target.Target, res error) type OnStepRunnerStopped func(err error) +type StepResult struct { + Err error + ResumeState json.RawMessage +} + type StepRunner struct { mu sync.Mutex stepIn chan *target.Target reportedTargets map[string]struct{} - started uint32 + started bool runningLoopActive bool stopOnce sync.Once finishedCh chan struct{} @@ -56,8 +61,18 @@ func (sr *StepRunner) Run( if targetCallback == nil { return fmt.Errorf("target callback should not be nil") } - if !atomic.CompareAndSwapUint32(&sr.started, 0, 1) { - return &cerrors.ErrAlreadyDone{} + + err := func() error { + sr.mu.Lock() + defer sr.mu.Unlock() + if sr.started { + return &cerrors.ErrAlreadyDone{} + } + sr.started = true + return nil + }() + if err != nil { + return err } srCtx, _ := xcontext.WithCancel(ctx) @@ -95,38 +110,51 @@ func (sr *StepRunner) Run( } func (sr *StepRunner) Started() bool { - return atomic.LoadUint32(&sr.started) == 1 + sr.mu.Lock() + defer sr.mu.Unlock() + + return sr.started } func (sr *StepRunner) Running() bool { sr.mu.Lock() defer sr.mu.Unlock() - return sr.Started() && sr.finishedCh != nil + return sr.started && sr.finishedCh != nil } -func (sr *StepRunner) WaitResults(ctx context.Context) (json.RawMessage, error) { +// WaitResults returns TestStep.Run() output +// It returns an error if and only if waiting was terminated by input ctx argument and returns ctx.Err() +func (sr *StepRunner) WaitResults(ctx context.Context) (stepResult StepResult, err error) { sr.mu.Lock() resultErr := sr.resultErr resultResumeState := sr.resultResumeState finishedCh := sr.finishedCh sr.mu.Unlock() + // StepRunner either finished with error or behaved incorrectly + // it makes no sense to wait while it finishes, return what we have if resultErr != nil { - return resultResumeState, resultErr + return StepResult{ + Err: resultErr, + ResumeState: resultResumeState, + }, nil } if finishedCh != nil { select { case <-ctx.Done(): - return nil, ctx.Err() + return StepResult{}, ctx.Err() case <-finishedCh: } } sr.mu.Lock() defer sr.mu.Unlock() - return sr.resultResumeState, sr.resultErr + return StepResult{ + Err: resultErr, + ResumeState: resultResumeState, + }, nil } // Stop triggers TestStep to stop running by closing input channel diff --git a/pkg/runner/test_runner.go b/pkg/runner/test_runner.go index e9cf6360..da17068e 100644 --- a/pkg/runner/test_runner.go +++ b/pkg/runner/test_runner.go @@ -297,16 +297,16 @@ func (tr *TestRunner) waitStepRunners(ctx xcontext.Context) error { if !ss.stepRunner.Started() { continue } - resumeState, err := ss.stepRunner.WaitResults(shutdownCtx) - if err == context.DeadlineExceeded { - err = &cerrors.ErrTestStepsNeverReturned{StepNames: []string{ss.sb.TestStepLabel}} + result, err := ss.stepRunner.WaitResults(shutdownCtx) + if err != nil { + result.Err = &cerrors.ErrTestStepsNeverReturned{StepNames: []string{ss.sb.TestStepLabel}} stepsNeverReturned = append(stepsNeverReturned, ss.sb.TestStepLabel) // Cancel this step's context, this will help release the reader. ss.cancel() } tr.mu.Lock() - ss.resumeState = resumeState - ss.setErrLocked(err) + ss.resumeState = result.ResumeState + ss.setErrLocked(result.Err) tr.mu.Unlock() } @@ -554,44 +554,38 @@ func (ss *stepState) emitEvent(ctx xcontext.Context, name event.Name, tgt *targe // reportTargetResult reports result of executing a step to the appropriate target handler. func (tr *TestRunner) reportTargetResult(ctx xcontext.Context, ss *stepState, tgt *target.Target, res error) error { - resCh, err := func() (chan error, error) { - tr.mu.Lock() - defer tr.mu.Unlock() - tgs := tr.targets[tgt.ID] - if tgs == nil { - return nil, &cerrors.ErrTestStepReturnedUnexpectedResult{ - StepName: ss.sb.TestStepLabel, - Target: tgt.ID, - } + tr.mu.Lock() + defer tr.mu.Unlock() + tgs := tr.targets[tgt.ID] + if tgs == nil { + return &cerrors.ErrTestStepReturnedUnexpectedResult{ + StepName: ss.sb.TestStepLabel, + Target: tgt.ID, } - // Begin is also allowed here because it may happen that we get a result before target handler updates phase. - if tgs.CurStep != ss.stepIndex || - (tgs.CurPhase != targetStepPhaseBegin && tgs.CurPhase != targetStepPhaseRun) { - return nil, &cerrors.ErrTestStepReturnedUnexpectedResult{ - StepName: ss.sb.TestStepLabel, - Target: tgt.ID, - } + } + // Begin is also allowed here because it may happen that we get a result before target handler updates phase. + if tgs.CurStep != ss.stepIndex || + (tgs.CurPhase != targetStepPhaseBegin && tgs.CurPhase != targetStepPhaseRun) { + return &cerrors.ErrTestStepReturnedUnexpectedResult{ + StepName: ss.sb.TestStepLabel, + Target: tgt.ID, } - if tgs.resCh == nil { - // If canceled or paused, target handler may have left early. We don't care though. - select { - case <-ctx.Done(): - return nil, xcontext.ErrCanceled - default: - // This should not happen, must be an internal error. - return nil, fmt.Errorf("%s: target handler %s is not there, dropping result on the floor", ss, tgs) - } + } + if tgs.resCh == nil { + // If canceled or paused, target handler may have left early. We don't care though. + select { + case <-ctx.Done(): + return xcontext.ErrCanceled + default: + // This should not happen, must be an internal error. + return fmt.Errorf("%s: target handler %s is not there, dropping result on the floor", ss, tgs) } - tgs.CurPhase = targetStepPhaseResultPending - ctx.Debugf("%s: result for %s: %v", ss, tgs, res) - return tgs.resCh, nil - }() - if err != nil { - return err } + tgs.CurPhase = targetStepPhaseResultPending + ctx.Debugf("%s: result for %s: %v", ss, tgs, res) // As it has buffer size 1 and we process a single result for each target at a time - resCh <- res + tgs.resCh <- res return nil } diff --git a/pkg/runner/test_runner_test.go b/pkg/runner/test_runner_test.go index b27186f6..2b6b08cb 100644 --- a/pkg/runner/test_runner_test.go +++ b/pkg/runner/test_runner_test.go @@ -73,7 +73,7 @@ type runRes struct { } type MemoryStorageEngine struct { - Storage storage.ResettableStorage + Storage storage.ResettableStorage StorageEngineVault *storage.SimpleEngineVault } @@ -89,7 +89,7 @@ func NewMemoryStorageEngine() (*MemoryStorageEngine, error) { } return &MemoryStorageEngine{ - Storage: ms, + Storage: ms, StorageEngineVault: storageEngineVault, }, nil } @@ -206,7 +206,7 @@ func (s *TestRunnerSuite) Test1Step1Success() { require.Equal(s.T(), ` {[1 1 SimpleTest 0 Step 1][(*Target)(nil) TestStepRunningEvent]} {[1 1 SimpleTest 0 Step 1][(*Target)(nil) TestStepFinishedEvent]} -`, s.internalStorage.GetStepEvents(testName,"")) +`, s.internalStorage.GetStepEvents(testName, "")) require.Equal(s.T(), ` {[1 1 SimpleTest 0 Step 1][Target{ID: "T1"} TargetIn]} {[1 1 SimpleTest 0 Step 1][Target{ID: "T1"} TestStartedEvent]} @@ -218,7 +218,7 @@ func (s *TestRunnerSuite) Test1Step1Success() { // Simple case: one target, one step that blocks for a bit, success. // We block for longer than the shutdown timeout of the test runner. func (s *TestRunnerSuite) Test1StepLongerThanShutdown1Success() { - tr := NewTestRunnerWithTimeouts(100*time.Millisecond) + tr := NewTestRunnerWithTimeouts(100 * time.Millisecond) _, targetsResults, err := s.runWithTimeout(ctx, tr, nil, 1, 2*time.Second, []*target.Target{tgt("T1")}, []test.TestStepBundle{ @@ -239,7 +239,7 @@ func (s *TestRunnerSuite) Test1StepLongerThanShutdown1Success() { {[1 1 SimpleTest 0 Step 1][Target{ID: "T1"} TestStartedEvent]} {[1 1 SimpleTest 0 Step 1][Target{ID: "T1"} TestFinishedEvent]} {[1 1 SimpleTest 0 Step 1][Target{ID: "T1"} TargetOut]} -`, s.internalStorage.GetTargetEvents(testName,"T1")) +`, s.internalStorage.GetTargetEvents(testName, "T1")) } // Simple case: one target, one step, failure. @@ -258,7 +258,7 @@ func (s *TestRunnerSuite) Test1Step1Fail() { require.Equal(s.T(), ` {[1 1 SimpleTest 0 Step 1][(*Target)(nil) TestStepRunningEvent]} {[1 1 SimpleTest 0 Step 1][(*Target)(nil) TestStepFinishedEvent]} -`, s.internalStorage.GetStepEvents(testName,"Step 1")) +`, s.internalStorage.GetStepEvents(testName, "Step 1")) require.Equal(s.T(), ` {[1 1 SimpleTest 0 Step 1][Target{ID: "T1"} TargetIn]} {[1 1 SimpleTest 0 Step 1][Target{ID: "T1"} TestStartedEvent]} @@ -316,7 +316,7 @@ func (s *TestRunnerSuite) Test3StepsNotReachedStepNotRun() { {[1 1 SimpleTest 0 Step 2][(*Target)(nil) TestStepRunningEvent]} {[1 1 SimpleTest 0 Step 2][(*Target)(nil) TestStepFinishedEvent]} `, s.internalStorage.GetStepEvents(testName, "Step 2")) - require.Equal(s.T(), "\n\n", s.internalStorage.GetStepEvents(testName,"Step 3")) + require.Equal(s.T(), "\n\n", s.internalStorage.GetStepEvents(testName, "Step 3")) require.Equal(s.T(), ` {[1 1 SimpleTest 0 Step 1][Target{ID: "T1"} TargetIn]} {[1 1 SimpleTest 0 Step 1][Target{ID: "T1"} TestStartedEvent]} @@ -338,10 +338,10 @@ func (s *TestRunnerSuite) Test3StepsNotReachedStepNotRun() { // A misbehaving step that fails to shut down properly after processing targets // and does not return. func (s *TestRunnerSuite) TestNoReturnStepWithCorrectTargetForwarding() { - tr := NewTestRunnerWithTimeouts(200*time.Millisecond) + tr := NewTestRunnerWithTimeouts(200 * time.Millisecond) ctx, cancel := xcontext.WithCancel(ctx) defer cancel() - _, _, err := s.runWithTimeout(ctx, tr, nil, 1, 2*time.Second, + _, _, err := s.runWithTimeout(ctx, tr, nil, 1, 2*time.Hour, []*target.Target{tgt("T1")}, []test.TestStepBundle{ s.newStep("Step 1", noreturn.Name, nil), @@ -363,7 +363,7 @@ func (s *TestRunnerSuite) TestStepPanics() { ) require.Error(s.T(), err) require.IsType(s.T(), &cerrors.ErrTestStepPaniced{}, err) - require.Equal(s.T(), "\n\n", s.internalStorage.GetTargetEvents(testName,"T1")) + require.Equal(s.T(), "\n\n", s.internalStorage.GetTargetEvents(testName, "T1")) require.Contains(s.T(), s.internalStorage.GetStepEvents(testName, "Step 1"), "step Step 1 paniced") } From 7922406f8562d8014ee8c10b466566cc90e44315 Mon Sep 17 00:00:00 2001 From: Ilya Date: Sat, 11 Dec 2021 00:07:39 +0000 Subject: [PATCH 18/20] Return original behaviour to reportTargetResult Make StepRunner.Run() use channels for communicating results Signed-off-by: Ilya --- pkg/runner/step_runner.go | 86 +++++++++---------- pkg/runner/test_runner.go | 149 +++++++++++++++++++++------------ pkg/runner/test_runner_test.go | 2 +- 3 files changed, 139 insertions(+), 98 deletions(-) diff --git a/pkg/runner/step_runner.go b/pkg/runner/step_runner.go index 3d19f8d3..277aed9a 100644 --- a/pkg/runner/step_runner.go +++ b/pkg/runner/step_runner.go @@ -15,8 +15,12 @@ import ( "github.com/linuxboot/contest/pkg/xcontext" ) -type OnTargetResult func(tgt *target.Target, res error) -type OnStepRunnerStopped func(err error) +type StepRunnerEvent struct { + // Target if set represents the target for which event was generated + Target *target.Target + // Err if Target is not nil refers to this Target result otherwise is execution error + Err error +} type StepResult struct { Err error @@ -36,7 +40,8 @@ type StepRunner struct { resultErr error resultResumeState json.RawMessage - stopCallback OnStepRunnerStopped + notifyStoppedOnce sync.Once + resultsChan chan<- StepRunnerEvent } func (sr *StepRunner) AddTarget(ctx xcontext.Context, tgt *target.Target) error { @@ -55,13 +60,7 @@ func (sr *StepRunner) Run( bundle test.TestStepBundle, ev testevent.Emitter, resumeState json.RawMessage, - targetCallback OnTargetResult, - stopCallback OnStepRunnerStopped, -) error { - if targetCallback == nil { - return fmt.Errorf("target callback should not be nil") - } - +) (<-chan StepRunnerEvent, error) { err := func() error { sr.mu.Lock() defer sr.mu.Unlock() @@ -72,41 +71,43 @@ func (sr *StepRunner) Run( return nil }() if err != nil { - return err + return nil, err } - srCtx, _ := xcontext.WithCancel(ctx) - sr.stopCallback = stopCallback + sr.finishedCh = make(chan struct{}) + resultsChan := make(chan StepRunnerEvent, 1) + sr.resultsChan = resultsChan var activeLoopsCount int32 = 2 finish := func() { if atomic.AddInt32(&activeLoopsCount, -1) != 0 { return } - sr.mu.Lock() - defer sr.mu.Unlock() + sr.mu.Lock() close(sr.finishedCh) sr.finishedCh = nil + sr.mu.Unlock() - // if an error occurred, this callback was invoked early - sr.notifyStoppedLocked(nil) - srCtx.Debugf("StepRunner finished") + // if an error occurred we already sent notification + sr.notifyStopped(nil) + close(sr.resultsChan) + ctx.Debugf("StepRunner finished") } stepOut := make(chan test.TestStepResult) go func() { defer finish() sr.runningLoop(ctx, stepOut, bundle, ev, resumeState) - srCtx.Debugf("Running loop finished") + ctx.Debugf("Running loop finished") }() go func() { defer finish() - sr.readingLoop(ctx, stepOut, bundle.TestStepLabel, targetCallback) - srCtx.Debugf("Reading loop finished") + sr.readingLoop(ctx, stepOut, bundle.TestStepLabel) + ctx.Debugf("Reading loop finished") }() - return nil + return resultsChan, nil } func (sr *StepRunner) Started() bool { @@ -168,10 +169,7 @@ func (sr *StepRunner) readingLoop( ctx xcontext.Context, stepOut chan test.TestStepResult, testStepLabel string, - targetCallback OnTargetResult, ) { - - cancelCh := ctx.Done() for { select { case res, ok := <-stepOut: @@ -202,9 +200,17 @@ func (sr *StepRunner) readingLoop( sr.setErr(ctx, &cerrors.ErrTestStepReturnedDuplicateResult{StepName: testStepLabel, Target: res.Target.ID}) return } - targetCallback(res.Target, res.Err) - case <-cancelCh: + select { + case sr.resultsChan <- StepRunnerEvent{Target: res.Target, Err: res.Err}: + case <-ctx.Done(): + ctx.Debugf( + "reading loop detected context canceled, target '%s' with result: '%v' was not reported", + res.Target.ID, + res.Err, + ) + } + case <-ctx.Done(): ctx.Debugf("reading loop detected context canceled") return } @@ -236,12 +242,10 @@ func (sr *StepRunner) runningLoop( resultResumeState, err := func() (json.RawMessage, error) { defer func() { if r := recover(); r != nil { - sr.mu.Lock() - sr.setErrLocked(ctx, &cerrors.ErrTestStepPaniced{ + sr.setErr(ctx, &cerrors.ErrTestStepPaniced{ StepName: bundle.TestStepLabel, StackTrace: fmt.Sprintf("%s / %s", r, debug.Stack()), }) - sr.mu.Unlock() } }() @@ -269,19 +273,13 @@ func (sr *StepRunner) setErrLocked(ctx xcontext.Context, err error) { } ctx.Errorf("err: %v", err) sr.resultErr = err - sr.notifyStoppedLocked(sr.resultErr) + sr.notifyStopped(sr.resultErr) } -func (sr *StepRunner) notifyStoppedLocked(err error) { - if sr.stopCallback == nil { - return - } - stopCallback := sr.stopCallback - sr.stopCallback = nil - - go func() { - stopCallback(err) - }() +func (sr *StepRunner) notifyStopped(err error) { + sr.notifyStoppedOnce.Do(func() { + sr.resultsChan <- StepRunnerEvent{Err: err} + }) } func safeCloseOutCh(ch chan test.TestStepResult) (recoverOccurred bool) { @@ -298,9 +296,7 @@ func safeCloseOutCh(ch chan test.TestStepResult) (recoverOccurred bool) { // NewStepRunner creates a new StepRunner object func NewStepRunner() *StepRunner { return &StepRunner{ - stepIn: make(chan *target.Target), - reportedTargets: make(map[string]struct{}), - runningLoopActive: true, - finishedCh: make(chan struct{}), + stepIn: make(chan *target.Target), + reportedTargets: make(map[string]struct{}), } } diff --git a/pkg/runner/test_runner.go b/pkg/runner/test_runner.go index da17068e..454436b6 100644 --- a/pkg/runner/test_runner.go +++ b/pkg/runner/test_runner.go @@ -8,7 +8,6 @@ package runner import ( "context" "encoding/json" - "errors" "fmt" "strconv" "sync" @@ -61,8 +60,9 @@ type stepState struct { stepIndex int // Index of this step in the pipeline. sb test.TestStepBundle // The test bundle. - ev testevent.Emitter - stepRunner *StepRunner + ev testevent.Emitter + stepRunner *StepRunner + readingLoopRunning bool resumeState json.RawMessage // Resume state passed to and returned by the Run method. runErr error // Runner error, returned from Run() or an error condition detected by the reader. @@ -243,8 +243,8 @@ func (tr *TestRunner) Run( ctx.Debugf("- %d in flight, ok to resume? %t", numInFlightTargets, resumeOk) ctx.Debugf("step states:") for i, ss := range tr.steps { - ctx.Debugf(" %d %s %t %t %v %s", - i, ss, ss.stepRunner.Started(), ss.stepRunner.Running(), ss.runErr, ss.resumeState) + ctx.Debugf(" %d %s %t %t %t %v %s", + i, ss, ss.stepRunner.Started(), ss.stepRunner.Running(), ss.readingLoopRunning, ss.runErr, ss.resumeState) } // Is there a useful error to report? @@ -303,6 +303,16 @@ func (tr *TestRunner) waitStepRunners(ctx xcontext.Context) error { stepsNeverReturned = append(stepsNeverReturned, ss.sb.TestStepLabel) // Cancel this step's context, this will help release the reader. ss.cancel() + } else { + // reading loop can still be running, wait for it to finish + tr.mu.Lock() + for { + if !ss.readingLoopRunning { + break + } + tr.monitorCond.Wait() + } + tr.mu.Unlock() } tr.mu.Lock() ss.resumeState = result.ResumeState @@ -497,31 +507,56 @@ loop: // runStepIfNeeded starts the step runner goroutine if not already running. func (tr *TestRunner) runStepIfNeeded(ss *stepState) error { tr.mu.Lock() + defer tr.mu.Unlock() + + if ss.stepRunner.Started() { + return nil + } + resumeState := ss.resumeState ss.resumeState = nil - tr.mu.Unlock() - err := ss.stepRunner.Run(ss.ctx, ss.sb, ss.ev, resumeState, - func(tgt *target.Target, res error) { - if err := tr.reportTargetResult(ss.ctx, ss, tgt, res); err != nil { - ss.ctx.Errorf("Reporting target result failed: %v", err) - tr.mu.Lock() - ss.setErrLocked(err) - tr.mu.Unlock() - } - tr.monitorCond.Signal() - }, - func(err error) { + resultCh, err := ss.stepRunner.Run(ss.ctx, ss.sb, ss.ev, resumeState) + if err != nil { + return fmt.Errorf("failed to stert a step runner for '%s': %v", ss.sb.TestStepLabel, err) + } + + ss.readingLoopRunning = true + go func() { + defer func() { tr.mu.Lock() defer tr.mu.Unlock() - ss.setErrLocked(err) + + ss.readingLoopRunning = false tr.monitorCond.Signal() - }, - ) - if errors.Is(err, &cerrors.ErrAlreadyDone{}) { - return nil - } - return err + }() + for { + select { + case stepResult, ok := <-resultCh: + if !ok { + ss.ctx.Debugf("step runner results channel was closed") + return + } + if stepResult.Target == nil { + tr.mu.Lock() + ss.setErrLocked(stepResult.Err) + tr.mu.Unlock() + } else { + if err := tr.reportTargetResult(ss.ctx, ss, stepResult.Target, stepResult.Err); err != nil { + ss.ctx.Errorf("Reporting target result failed: %v", err) + tr.mu.Lock() + ss.setErrLocked(err) + tr.mu.Unlock() + } + } + tr.monitorCond.Signal() + case <-ss.ctx.Done(): + ss.ctx.Debugf("step context was cancelled") + return + } + } + }() + return nil } // setErrLocked sets step runner error unless already set. @@ -554,38 +589,48 @@ func (ss *stepState) emitEvent(ctx xcontext.Context, name event.Name, tgt *targe // reportTargetResult reports result of executing a step to the appropriate target handler. func (tr *TestRunner) reportTargetResult(ctx xcontext.Context, ss *stepState, tgt *target.Target, res error) error { - tr.mu.Lock() - defer tr.mu.Unlock() - tgs := tr.targets[tgt.ID] - if tgs == nil { - return &cerrors.ErrTestStepReturnedUnexpectedResult{ - StepName: ss.sb.TestStepLabel, - Target: tgt.ID, + resCh, err := func() (chan error, error) { + tr.mu.Lock() + defer tr.mu.Unlock() + tgs := tr.targets[tgt.ID] + if tgs == nil { + return nil, &cerrors.ErrTestStepReturnedUnexpectedResult{ + StepName: ss.sb.TestStepLabel, + Target: tgt.ID, + } } - } - // Begin is also allowed here because it may happen that we get a result before target handler updates phase. - if tgs.CurStep != ss.stepIndex || - (tgs.CurPhase != targetStepPhaseBegin && tgs.CurPhase != targetStepPhaseRun) { - return &cerrors.ErrTestStepReturnedUnexpectedResult{ - StepName: ss.sb.TestStepLabel, - Target: tgt.ID, + // Begin is also allowed here because it may happen that we get a result before target handler updates phase. + if tgs.CurStep != ss.stepIndex || + (tgs.CurPhase != targetStepPhaseBegin && tgs.CurPhase != targetStepPhaseRun) { + return nil, &cerrors.ErrTestStepReturnedUnexpectedResult{ + StepName: ss.sb.TestStepLabel, + Target: tgt.ID, + } } - } - if tgs.resCh == nil { - // If canceled or paused, target handler may have left early. We don't care though. - select { - case <-ctx.Done(): - return xcontext.ErrCanceled - default: - // This should not happen, must be an internal error. - return fmt.Errorf("%s: target handler %s is not there, dropping result on the floor", ss, tgs) + if tgs.resCh == nil { + // If canceled or paused, target handler may have left early. We don't care though. + select { + case <-ctx.Done(): + return nil, xcontext.ErrCanceled + default: + // This should not happen, must be an internal error. + return nil, fmt.Errorf("%s: target handler %s is not there, dropping result on the floor", ss, tgs) + } } + tgs.CurPhase = targetStepPhaseResultPending + ctx.Debugf("%s: result for %s: %v", ss, tgs, res) + return tgs.resCh, nil + }() + if err != nil { + return err } - tgs.CurPhase = targetStepPhaseResultPending - ctx.Debugf("%s: result for %s: %v", ss, tgs, res) - // As it has buffer size 1 and we process a single result for each target at a time - tgs.resCh <- res + select { + case resCh <- res: + break + case <-ss.ctx.Done(): + break + } return nil } @@ -675,7 +720,7 @@ tgtLoop: // It's been paused, this is fine. continue } - if ss.stepRunner.Started() && !ss.stepRunner.Running() { + if ss.stepRunner.Started() && !ss.readingLoopRunning { // Target has been injected but step runner has exited without a valid reason, this target has been lost. runErr = &cerrors.ErrTestStepLostTargets{ StepName: ss.sb.TestStepLabel, diff --git a/pkg/runner/test_runner_test.go b/pkg/runner/test_runner_test.go index 2b6b08cb..8f1962cc 100644 --- a/pkg/runner/test_runner_test.go +++ b/pkg/runner/test_runner_test.go @@ -341,7 +341,7 @@ func (s *TestRunnerSuite) TestNoReturnStepWithCorrectTargetForwarding() { tr := NewTestRunnerWithTimeouts(200 * time.Millisecond) ctx, cancel := xcontext.WithCancel(ctx) defer cancel() - _, _, err := s.runWithTimeout(ctx, tr, nil, 1, 2*time.Hour, + _, _, err := s.runWithTimeout(ctx, tr, nil, 1, 2*time.Second, []*target.Target{tgt("T1")}, []test.TestStepBundle{ s.newStep("Step 1", noreturn.Name, nil), From 5efe6d8063ca30f5afc55acb3786d8966eaa6180 Mon Sep 17 00:00:00 2001 From: Ilya Date: Mon, 13 Dec 2021 01:08:39 +0000 Subject: [PATCH 19/20] Add basic unit test for StepRunner Signed-off-by: Ilya --- pkg/runner/base_test_suite_test.go | 61 ++++++++++++++++++ pkg/runner/job_runner_test.go | 57 +++------------- pkg/runner/step_runner_test.go | 100 +++++++++++++++++++++++++++++ 3 files changed, 170 insertions(+), 48 deletions(-) create mode 100644 pkg/runner/base_test_suite_test.go create mode 100644 pkg/runner/step_runner_test.go diff --git a/pkg/runner/base_test_suite_test.go b/pkg/runner/base_test_suite_test.go new file mode 100644 index 00000000..b0569b6f --- /dev/null +++ b/pkg/runner/base_test_suite_test.go @@ -0,0 +1,61 @@ +package runner + +import ( + "encoding/json" + + "github.com/benbjohnson/clock" + "github.com/linuxboot/contest/pkg/event/testevent" + "github.com/linuxboot/contest/pkg/pluginregistry" + "github.com/linuxboot/contest/pkg/target" + "github.com/linuxboot/contest/pkg/test" + "github.com/linuxboot/contest/pkg/xcontext" + "github.com/linuxboot/contest/plugins/targetlocker/inmemory" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +type BaseTestSuite struct { + suite.Suite + + pluginRegistry *pluginregistry.PluginRegistry + internalStorage *MemoryStorageEngine +} + +func (s *BaseTestSuite) SetupTest() { + storageEngine, err := NewMemoryStorageEngine() + require.NoError(s.T(), err) + s.internalStorage = storageEngine + + target.SetLocker(inmemory.New(clock.New())) + + s.pluginRegistry = pluginregistry.NewPluginRegistry(xcontext.Background()) +} + +func (s *BaseTestSuite) TearDownTest() { + target.SetLocker(nil) +} + +func (s *BaseTestSuite) RegisterStateFullStep( + runFunction func( + ctx xcontext.Context, ch test.TestStepChannels, params test.TestStepParameters, + ev testevent.Emitter, resumeState json.RawMessage) (json.RawMessage, error), + validateFunction func(ctx xcontext.Context, params test.TestStepParameters) error) error { + + return s.pluginRegistry.RegisterTestStep(stateFullStepName, func() test.TestStep { + return &stateFullStep{ + runFunction: runFunction, + validateFunction: validateFunction, + } + }, nil) +} + +func (s *BaseTestSuite) NewStep(label, name string, params test.TestStepParameters) test.TestStepBundle { + td := test.TestStepDescriptor{ + Name: name, + Label: label, + Parameters: params, + } + sb, err := s.pluginRegistry.NewTestStepBundle(ctx, td) + require.NoError(s.T(), err) + return *sb +} diff --git a/pkg/runner/job_runner_test.go b/pkg/runner/job_runner_test.go index 32cb7630..15ecbfac 100644 --- a/pkg/runner/job_runner_test.go +++ b/pkg/runner/job_runner_test.go @@ -17,12 +17,10 @@ import ( "github.com/linuxboot/contest/pkg/event" "github.com/linuxboot/contest/pkg/event/testevent" "github.com/linuxboot/contest/pkg/job" - "github.com/linuxboot/contest/pkg/pluginregistry" "github.com/linuxboot/contest/pkg/storage" "github.com/linuxboot/contest/pkg/target" "github.com/linuxboot/contest/pkg/test" "github.com/linuxboot/contest/pkg/xcontext" - "github.com/linuxboot/contest/plugins/targetlocker/inmemory" "github.com/linuxboot/contest/plugins/targetmanagers/targetlist" "github.com/linuxboot/contest/plugins/teststeps" "github.com/linuxboot/contest/plugins/teststeps/echo" @@ -84,10 +82,7 @@ func (r *collectingReporter) FinalReport(ctx xcontext.Context, parameters interf } type JobRunnerSuite struct { - suite.Suite - - pluginRegistry *pluginregistry.PluginRegistry - internalStorage *MemoryStorageEngine + BaseTestSuite } func TestTestStepSuite(t *testing.T) { @@ -95,13 +90,8 @@ func TestTestStepSuite(t *testing.T) { } func (s *JobRunnerSuite) SetupTest() { - storageEngine, err := NewMemoryStorageEngine() - require.NoError(s.T(), err) - s.internalStorage = storageEngine + s.BaseTestSuite.SetupTest() - target.SetLocker(inmemory.New(clock.New())) - - s.pluginRegistry = pluginregistry.NewPluginRegistry(xcontext.Background()) for _, e := range []struct { name string factory test.TestStepFactory @@ -113,40 +103,11 @@ func (s *JobRunnerSuite) SetupTest() { } } -func (s *JobRunnerSuite) TearDownTest() { - target.SetLocker(nil) -} - -func (s *JobRunnerSuite) registerStateFullStep( - runFunction func( - ctx xcontext.Context, ch test.TestStepChannels, params test.TestStepParameters, - ev testevent.Emitter, resumeState json.RawMessage) (json.RawMessage, error), - validateFunction func(ctx xcontext.Context, params test.TestStepParameters) error) error { - - return s.pluginRegistry.RegisterTestStep(stateFullStepName, func() test.TestStep { - return &stateFullStep{ - runFunction: runFunction, - validateFunction: validateFunction, - } - }, nil) -} - -func (s *JobRunnerSuite) newStep(label, name string, params test.TestStepParameters) test.TestStepBundle { - td := test.TestStepDescriptor{ - Name: name, - Label: label, - Parameters: params, - } - sb, err := s.pluginRegistry.NewTestStepBundle(ctx, td) - require.NoError(s.T(), err) - return *sb -} - func (s *JobRunnerSuite) TestSimpleJobStartFinish() { var mu sync.Mutex var resultTargets []*target.Target - require.NoError(s.T(), s.registerStateFullStep( + require.NoError(s.T(), s.RegisterStateFullStep( func(ctx xcontext.Context, ch test.TestStepChannels, params test.TestStepParameters, ev testevent.Emitter, resumeState json.RawMessage) (json.RawMessage, error) { return teststeps.ForEachTarget(stateFullStepName, ctx, ch, func(ctx xcontext.Context, target *target.Target) error { assert.NotNil(s.T(), target) @@ -180,7 +141,7 @@ func (s *JobRunnerSuite) TestSimpleJobStartFinish() { TargetManager: targetlist.New(), }, TestStepsBundles: []test.TestStepBundle{ - s.newStep("test_step_label", stateFullStepName, nil), + s.NewStep("test_step_label", stateFullStepName, nil), }, }, }, @@ -209,7 +170,7 @@ func (s *JobRunnerSuite) TestJobWithTestRetry() { var resultTargets []*target.Target var callsCount int - require.NoError(s.T(), s.registerStateFullStep( + require.NoError(s.T(), s.RegisterStateFullStep( func(ctx xcontext.Context, ch test.TestStepChannels, params test.TestStepParameters, ev testevent.Emitter, resumeState json.RawMessage) (json.RawMessage, error) { return teststeps.ForEachTarget(stateFullStepName, ctx, ch, func(ctx xcontext.Context, target *target.Target) error { assert.NotNil(s.T(), target) @@ -259,11 +220,11 @@ func (s *JobRunnerSuite) TestJobWithTestRetry() { TargetManager: targetlist.New(), }, TestStepsBundles: []test.TestStepBundle{ - s.newStep("echo1_step_label", echo.Name, map[string][]test.Param{ + s.NewStep("echo1_step_label", echo.Name, map[string][]test.Param{ "text": {*test.NewParam("hello")}, }), - s.newStep("test_step_label", stateFullStepName, nil), - s.newStep("echo2_step_label", echo.Name, map[string][]test.Param{ + s.NewStep("test_step_label", stateFullStepName, nil), + s.NewStep("echo2_step_label", echo.Name, map[string][]test.Param{ "text": {*test.NewParam("world")}, }), }, @@ -342,7 +303,7 @@ func (s *JobRunnerSuite) TestResumeStateBadJobId() { TargetManager: targetlist.New(), }, TestStepsBundles: []test.TestStepBundle{ - s.newStep("echo1_step_label", echo.Name, map[string][]test.Param{ + s.NewStep("echo1_step_label", echo.Name, map[string][]test.Param{ "text": {*test.NewParam("hello")}, }), }, diff --git a/pkg/runner/step_runner_test.go b/pkg/runner/step_runner_test.go new file mode 100644 index 00000000..70a5e46b --- /dev/null +++ b/pkg/runner/step_runner_test.go @@ -0,0 +1,100 @@ +package runner + +import ( + "encoding/json" + "fmt" + "sync" + "testing" + + "github.com/linuxboot/contest/pkg/event/testevent" + "github.com/linuxboot/contest/pkg/target" + "github.com/linuxboot/contest/pkg/test" + "github.com/linuxboot/contest/pkg/xcontext" + "github.com/linuxboot/contest/plugins/teststeps" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +func TestStepRunnerSuite(t *testing.T) { + suite.Run(t, new(StepRunnerSuite)) +} + +type StepRunnerSuite struct { + BaseTestSuite +} + +func (s *StepRunnerSuite) TestRunningStep() { + targetsReaction := map[string]error{ + "TSucc": nil, + "TFail": fmt.Errorf("oops"), + } + + var mu sync.Mutex + var obtainedTargets []target.Target + var obtainedResumeState json.RawMessage + + err := s.RegisterStateFullStep( + func(ctx xcontext.Context, ch test.TestStepChannels, params test.TestStepParameters, ev testevent.Emitter, resumeState json.RawMessage) (json.RawMessage, error) { + obtainedResumeState = resumeState + _, err := teststeps.ForEachTarget(stateFullStepName, ctx, ch, func(ctx xcontext.Context, target *target.Target) error { + require.NotNil(s.T(), target) + + mu.Lock() + defer mu.Unlock() + obtainedTargets = append(obtainedTargets, *target) + return targetsReaction[target.ID] + }) + if err != nil { + return nil, err + } + return json.RawMessage("{\"output\": true}"), nil + }, + nil, + ) + require.NoError(s.T(), err) + + stepRunner := NewStepRunner() + require.NotNil(s.T(), stepRunner) + + emitterFactory := NewTestStepEventsEmitterFactory(s.internalStorage.StorageEngineVault, 1, 1, testName, 0) + emitter := emitterFactory.New("test_step_label") + + inputResumeState := json.RawMessage("{\"some_input\": 42}") + resultChan, err := stepRunner.Run(ctx, s.NewStep("test_step_label", stateFullStepName, nil), emitter, inputResumeState) + require.NoError(s.T(), err) + require.NotNil(s.T(), resultChan) + + require.NoError(s.T(), stepRunner.AddTarget(ctx, tgt("TSucc"))) + ev, ok := <-resultChan + require.True(s.T(), ok) + require.Equal(s.T(), tgt("TSucc"), ev.Target) + require.NoError(s.T(), ev.Err) + + require.NoError(s.T(), stepRunner.AddTarget(ctx, tgt("TFail"))) + ev, ok = <-resultChan + require.True(s.T(), ok) + require.Equal(s.T(), tgt("TFail"), ev.Target) + require.Error(s.T(), ev.Err) + + stepRunner.Stop() + + ev, ok = <-resultChan + require.True(s.T(), ok) + require.Nil(s.T(), ev.Target) + require.NoError(s.T(), ev.Err) + + ev, ok = <-resultChan + require.False(s.T(), ok) + + closedCtx, cancel := xcontext.WithCancel(ctx) + cancel() + + // if step runner has results, it should return them even if input context is closed + res, err := stepRunner.WaitResults(closedCtx) + require.NoError(s.T(), err) + + require.Equal(s.T(), json.RawMessage("{\"output\": true}"), res.ResumeState) + require.NoError(s.T(), res.Err) + + require.Equal(s.T(), inputResumeState, obtainedResumeState) +} From 62d049c632aba94e2fd71490f7f7ab2c47e1932e Mon Sep 17 00:00:00 2001 From: Ilya Date: Tue, 14 Dec 2021 23:18:27 +0000 Subject: [PATCH 20/20] Code style improvements of StepRunner Signed-off-by: Ilya --- pkg/runner/step_runner.go | 63 ++++++++++++++++++++------------------- pkg/runner/test_runner.go | 3 +- 2 files changed, 34 insertions(+), 32 deletions(-) diff --git a/pkg/runner/step_runner.go b/pkg/runner/step_runner.go index 277aed9a..25d78830 100644 --- a/pkg/runner/step_runner.go +++ b/pkg/runner/step_runner.go @@ -30,23 +30,24 @@ type StepResult struct { type StepRunner struct { mu sync.Mutex - stepIn chan *target.Target - reportedTargets map[string]struct{} - - started bool - runningLoopActive bool + input chan *target.Target stopOnce sync.Once + resultsChan chan<- StepRunnerEvent + runningLoopActive bool finishedCh chan struct{} resultErr error resultResumeState json.RawMessage notifyStoppedOnce sync.Once - resultsChan chan<- StepRunnerEvent } func (sr *StepRunner) AddTarget(ctx xcontext.Context, tgt *target.Target) error { + if tgt == nil { + return fmt.Errorf("target should not be nil") + } + select { - case sr.stepIn <- tgt: + case sr.input <- tgt: case <-ctx.Until(xcontext.ErrPaused): return xcontext.ErrPaused case <-ctx.Done(): @@ -61,20 +62,15 @@ func (sr *StepRunner) Run( ev testevent.Emitter, resumeState json.RawMessage, ) (<-chan StepRunnerEvent, error) { - err := func() error { - sr.mu.Lock() - defer sr.mu.Unlock() - if sr.started { - return &cerrors.ErrAlreadyDone{} - } - sr.started = true - return nil - }() - if err != nil { - return nil, err + sr.mu.Lock() + defer sr.mu.Unlock() + + if sr.resultsChan != nil { + return nil, &cerrors.ErrAlreadyDone{} } - sr.finishedCh = make(chan struct{}) + finishedCh := make(chan struct{}) + sr.finishedCh = finishedCh resultsChan := make(chan StepRunnerEvent, 1) sr.resultsChan = resultsChan @@ -95,10 +91,11 @@ func (sr *StepRunner) Run( ctx.Debugf("StepRunner finished") } + stepIn := sr.input stepOut := make(chan test.TestStepResult) go func() { defer finish() - sr.runningLoop(ctx, stepOut, bundle, ev, resumeState) + sr.runningLoop(ctx, stepIn, stepOut, bundle, ev, resumeState) ctx.Debugf("Running loop finished") }() @@ -107,6 +104,7 @@ func (sr *StepRunner) Run( sr.readingLoop(ctx, stepOut, bundle.TestStepLabel) ctx.Debugf("Reading loop finished") }() + return resultsChan, nil } @@ -114,14 +112,14 @@ func (sr *StepRunner) Started() bool { sr.mu.Lock() defer sr.mu.Unlock() - return sr.started + return sr.resultsChan != nil } func (sr *StepRunner) Running() bool { sr.mu.Lock() defer sr.mu.Unlock() - return sr.started && sr.finishedCh != nil + return sr.resultsChan != nil && sr.finishedCh != nil } // WaitResults returns TestStep.Run() output @@ -161,7 +159,7 @@ func (sr *StepRunner) WaitResults(ctx context.Context) (stepResult StepResult, e // Stop triggers TestStep to stop running by closing input channel func (sr *StepRunner) Stop() { sr.stopOnce.Do(func() { - close(sr.stepIn) + close(sr.input) }) } @@ -170,6 +168,7 @@ func (sr *StepRunner) readingLoop( stepOut chan test.TestStepResult, testStepLabel string, ) { + reportedTargets := make(map[string]struct{}) for { select { case res, ok := <-stepOut: @@ -191,10 +190,8 @@ func (sr *StepRunner) readingLoop( } ctx.Infof("Obtained '%v' for target '%s'", res, res.Target.ID) - sr.mu.Lock() - _, found := sr.reportedTargets[res.Target.ID] - sr.reportedTargets[res.Target.ID] = struct{}{} - sr.mu.Unlock() + _, found := reportedTargets[res.Target.ID] + reportedTargets[res.Target.ID] = struct{}{} if found { sr.setErr(ctx, &cerrors.ErrTestStepReturnedDuplicateResult{StepName: testStepLabel, Target: res.Target.ID}) @@ -219,6 +216,7 @@ func (sr *StepRunner) readingLoop( func (sr *StepRunner) runningLoop( ctx xcontext.Context, + stepIn <-chan *target.Target, stepOut chan test.TestStepResult, bundle test.TestStepBundle, ev testevent.Emitter, @@ -249,7 +247,7 @@ func (sr *StepRunner) runningLoop( } }() - inChannels := test.TestStepChannels{In: sr.stepIn, Out: stepOut} + inChannels := test.TestStepChannels{In: stepIn, Out: stepOut} return bundle.TestStep.Run(ctx, inChannels, bundle.Parameters, ev, resumeState) }() ctx.Debugf("TestStep finished '%v', rs %s", err, string(resultResumeState)) @@ -273,7 +271,11 @@ func (sr *StepRunner) setErrLocked(ctx xcontext.Context, err error) { } ctx.Errorf("err: %v", err) sr.resultErr = err - sr.notifyStopped(sr.resultErr) + + // notifyStopped is a blocking operation: should release the lock + sr.mu.Unlock() + sr.notifyStopped(err) + sr.mu.Lock() } func (sr *StepRunner) notifyStopped(err error) { @@ -296,7 +298,6 @@ func safeCloseOutCh(ch chan test.TestStepResult) (recoverOccurred bool) { // NewStepRunner creates a new StepRunner object func NewStepRunner() *StepRunner { return &StepRunner{ - stepIn: make(chan *target.Target), - reportedTargets: make(map[string]struct{}), + input: make(chan *target.Target), } } diff --git a/pkg/runner/test_runner.go b/pkg/runner/test_runner.go index 454436b6..20668205 100644 --- a/pkg/runner/test_runner.go +++ b/pkg/runner/test_runner.go @@ -538,6 +538,7 @@ func (tr *TestRunner) runStepIfNeeded(ss *stepState) error { return } if stepResult.Target == nil { + ss.ctx.Errorf("step runner results an err: %v", err) tr.mu.Lock() ss.setErrLocked(stepResult.Err) tr.mu.Unlock() @@ -564,7 +565,7 @@ func (ss *stepState) setErrLocked(err error) { if err == nil || ss.runErr != nil { return } - ss.ctx.Errorf("err: %v", err) + ss.ctx.Errorf("setErrLocked: %v", err) ss.runErr = err }