Skip to content

Commit

Permalink
Start StepRunner via explicit Run method
Browse files Browse the repository at this point in the history
Signed-off-by: Ilya <[email protected]>
  • Loading branch information
rihter007 committed Nov 25, 2021
1 parent 1606015 commit c5098b3
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 117 deletions.
165 changes: 88 additions & 77 deletions pkg/runner/step_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,28 @@ import (
"github.com/linuxboot/contest/pkg/xcontext"
)

type OnTargetResult func(res error)
type OnTargetResult func(tgt *target.Target, res error)
type OnStepRunnerStopped func(err error)

type StepRunner struct {
ctx xcontext.Context
cancel context.CancelFunc
mu sync.Mutex
stopCallback OnStepRunnerStopped
ctx xcontext.Context
cancel context.CancelFunc
mu sync.Mutex
targetCallback OnTargetResult
stopCallback OnStepRunnerStopped

stepIn chan *target.Target
addedTargets map[string]OnTargetResult
stopOnce sync.Once
reportedTargets map[string]struct{}
started uint32
runningLoopActive bool

stopped chan struct{}
finished chan struct{}
resultErr error
resultResumeState json.RawMessage
}

func (sr *StepRunner) AddTarget(tgt *target.Target, callback OnTargetResult) error {
if callback == nil {
return fmt.Errorf("callback should not be nil")
}
err := func(tgt *target.Target, callback OnTargetResult) error {
func (sr *StepRunner) AddTarget(tgt *target.Target) error {
err := func(tgt *target.Target) error {
sr.mu.Lock()
defer sr.mu.Unlock()

Expand All @@ -48,14 +47,8 @@ func (sr *StepRunner) AddTarget(tgt *target.Target, callback OnTargetResult) err
if sr.stepIn == nil {
return fmt.Errorf("step runner is stopped")
}

existingCb := sr.addedTargets[tgt.ID]
if existingCb != nil {
return fmt.Errorf("existing target")
}
sr.addedTargets[tgt.ID] = callback
return nil
}(tgt, callback)
}(tgt)
if err != nil {
return err
}
Expand All @@ -71,18 +64,61 @@ func (sr *StepRunner) AddTarget(tgt *target.Target, callback OnTargetResult) err
return nil
}

func (sr *StepRunner) IsRunning() bool {
func (sr *StepRunner) Run(
bundle test.TestStepBundle,
ev testevent.Emitter,
resumeState json.RawMessage,
) {
if !atomic.CompareAndSwapUint32(&sr.started, 0, 1) {
return
}

var activeLoopsCount int32 = 2
onFinished := func() {
if atomic.AddInt32(&activeLoopsCount, -1) != 0 {
return
}
sr.mu.Lock()
defer sr.mu.Unlock()

close(sr.finished)
sr.finished = nil

// if an error occurred, this callback was invoked early
sr.notifyStoppedLocked(nil)
sr.ctx.Debugf("StepRunner finished")
}

stepOut := make(chan test.TestStepResult)
go func() {
defer onFinished()
sr.runningLoop(stepOut, bundle, ev, resumeState)
sr.ctx.Debugf("Running loop finished")
}()

go func() {
defer onFinished()
sr.readingLoop(stepOut, bundle.TestStepLabel)
sr.ctx.Debugf("Reading loop finished")
}()
}

func (sr *StepRunner) Started() bool {
return atomic.LoadUint32(&sr.started) == 1
}

func (sr *StepRunner) Running() bool {
sr.mu.Lock()
defer sr.mu.Unlock()

return sr.stopped != nil
return sr.Started() && sr.finished != nil
}

func (sr *StepRunner) WaitResults(ctx context.Context) (json.RawMessage, error) {
sr.mu.Lock()
resultErr := sr.resultErr
resultResumeState := sr.resultResumeState
stepStopped := sr.stopped
stepStopped := sr.finished
sr.mu.Unlock()

if resultErr != nil {
Expand All @@ -104,24 +140,22 @@ func (sr *StepRunner) WaitResults(ctx context.Context) (json.RawMessage, error)

// Stop triggers TestStep to stop running by closing input channel
func (sr *StepRunner) Stop() {
sr.mu.Lock()
defer sr.mu.Unlock()

if sr.stepIn != nil {
sr.ctx.Debugf("Close input channel")
sr.stopOnce.Do(func() {
close(sr.stepIn)
sr.stepIn = nil
}
sr.ctx.Debugf("Input channel was closed")
})
}

func (sr *StepRunner) readingLoop(stepOut chan test.TestStepResult, testStepLabel string) {
invokePanicSafe := func(callback OnTargetResult, res error) {
func (sr *StepRunner) targetCallbackPanicSafe(tgt *target.Target, res error) {
defer func() {
if r := recover(); r != nil {
sr.ctx.Errorf("Callback panic, stack: %s", debug.Stack())
}
callback(res)
}
}()
sr.targetCallback(tgt, res)
}

func (sr *StepRunner) readingLoop(stepOut chan test.TestStepResult, testStepLabel string) {
for {
select {
case res, ok := <-stepOut:
Expand All @@ -141,21 +175,18 @@ func (sr *StepRunner) readingLoop(stepOut chan test.TestStepResult, testStepLabe
sr.setErr(&cerrors.ErrTestStepReturnedNoTarget{StepName: testStepLabel})
return
}
sr.ctx.Infof("Obtained '%v' for target '%s'", res, res.Target.ID)

sr.mu.Lock()
callback, found := sr.addedTargets[res.Target.ID]
sr.addedTargets[res.Target.ID] = nil
_, found := sr.reportedTargets[res.Target.ID]
sr.reportedTargets[res.Target.ID] = struct{}{}
sr.mu.Unlock()

if !found {
sr.setErr(&cerrors.ErrTestStepReturnedUnexpectedResult{StepName: testStepLabel, Target: res.Target.ID})
return
}
if callback == nil {
if found {
sr.setErr(&cerrors.ErrTestStepReturnedDuplicateResult{StepName: testStepLabel, Target: res.Target.ID})
return
}
invokePanicSafe(callback, res.Err)
sr.targetCallbackPanicSafe(res.Target, res.Err)

case <-sr.ctx.Done():
sr.ctx.Debugf("canceled readingLoop")
Expand All @@ -165,7 +196,7 @@ func (sr *StepRunner) readingLoop(stepOut chan test.TestStepResult, testStepLabe
}

func (sr *StepRunner) runningLoop(
stepIn chan *target.Target, stepOut chan test.TestStepResult,
stepOut chan test.TestStepResult,
bundle test.TestStepBundle, ev testevent.Emitter, resumeState json.RawMessage,
) {
defer func() {
Expand All @@ -190,9 +221,13 @@ func (sr *StepRunner) runningLoop(
}
}()

inChannels := test.TestStepChannels{In: stepIn, Out: stepOut}
sr.mu.Lock()
sr.runningLoopActive = true
sr.mu.Unlock()

inChannels := test.TestStepChannels{In: sr.stepIn, Out: stepOut}
resultResumeState, err := bundle.TestStep.Run(sr.ctx, inChannels, bundle.Parameters, ev, resumeState)
sr.ctx.Debugf("Step runner finished '%v', rs %s", err, string(resultResumeState))
sr.ctx.Debugf("TestStep finished '%v', rs %s", err, string(resultResumeState))

sr.mu.Lock()
sr.setErrLocked(err)
Expand Down Expand Up @@ -246,48 +281,24 @@ func safeCloseOutCh(ch chan test.TestStepResult) (recoverOccurred bool) {
// NewStepRunner creates a new StepRunner object
func NewStepRunner(
ctx xcontext.Context,
bundle test.TestStepBundle,
ev testevent.Emitter,
resumeState json.RawMessage,
targetCallback OnTargetResult,
stoppedCallback OnStepRunnerStopped,
) *StepRunner {
if targetCallback == nil {
panic("target callback should not be nil")
}
stepIn := make(chan *target.Target)
stepOut := make(chan test.TestStepResult)

srCrx, cancel := xcontext.WithCancel(ctx)
sr := &StepRunner{
ctx: srCrx,
cancel: cancel,
stepIn: stepIn,
addedTargets: make(map[string]OnTargetResult),
runningLoopActive: true,
stopped: make(chan struct{}),
targetCallback: targetCallback,
stopCallback: stoppedCallback,
reportedTargets: make(map[string]struct{}),
runningLoopActive: true,
finished: make(chan struct{}),
}

var activeLoopsCount int32 = 2
onFinished := func() {
if atomic.AddInt32(&activeLoopsCount, -1) != 0 {
return
}
sr.mu.Lock()
defer sr.mu.Unlock()

close(sr.stopped)
sr.stopped = nil

// if an error occurred, this callback was invoked early
sr.notifyStoppedLocked(nil)
}

go func() {
defer onFinished()
sr.runningLoop(stepIn, stepOut, bundle, ev, resumeState)
}()

go func() {
defer onFinished()
sr.readingLoop(stepOut, bundle.TestStepLabel)
}()
return sr
}
Loading

0 comments on commit c5098b3

Please sign in to comment.