Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pkg/event/testevent/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Header struct {
RunID types.RunID
TestName string
TestStepLabel string
Retry int
}

// Data models the data of a test event. It is populated by the TestStep
Expand All @@ -46,6 +47,11 @@ func New(header *Header, data *Data) Event {
return Event{Header: header, Data: data}
}

// NewWithCurrentEmitTime creates a new Event and sets EmitTime to time.Now()
func NewWithCurrentEmitTime(header *Header, data *Data) Event {
return Event{Header: header, Data: data, EmitTime: time.Now()}
}

// Query wraps information that are used to build queries for
// test events, on top of the common EventQuery fields
type Query struct {
Expand Down Expand Up @@ -155,7 +161,7 @@ type EmitterFetcher interface {
}

func (h *Header) String() string {
return fmt.Sprintf("[%d %d %s %s]", h.JobID, h.RunID, h.TestName, h.TestStepLabel)
return fmt.Sprintf("[%d %d %s %s %d]", h.JobID, h.RunID, h.TestName, h.TestStepLabel, h.Retry)
}

func (d *Data) String() string {
Expand Down
9 changes: 5 additions & 4 deletions pkg/pluginregistry/bundles.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ func (r *PluginRegistry) NewTestStepBundle(ctx xcontext.Context, testStepDescrip
return nil, ErrStepLabelIsMandatory{TestStepDescriptor: testStepDescriptor}
}
testStepBundle := test.TestStepBundle{
TestStep: testStep,
TestStepLabel: label,
Parameters: testStepDescriptor.Parameters,
AllowedEvents: allowedEvents,
TestStep: testStep,
TestStepLabel: label,
Parameters: testStepDescriptor.Parameters,
RetryParameters: testStepDescriptor.RetryParameters,
AllowedEvents: allowedEvents,
}
return &testStepBundle, nil
}
Expand Down
23 changes: 17 additions & 6 deletions pkg/runner/job_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,31 @@ func (jr *JobRunner) buildTestStepStatus(ctx xcontext.Context, coordinates job.T
return nil, fmt.Errorf("could not fetch events associated to test step %s: %v", coordinates.TestStepLabel, err)
}

// keep only events with the latest retry
var lastTestStepRetry int
for _, ev := range testEvents {
if ev.Header.Retry > lastTestStepRetry {
lastTestStepRetry = ev.Header.Retry
}
}

var stepEvents, targetEvents []testevent.Event
for _, event := range testEvents {
if event.Data.Target == nil {
for _, ev := range testEvents {
if ev.Header.Retry >= 0 && ev.Header.Retry != lastTestStepRetry {
continue
}
if ev.Data.Target == nil {
// we don't want target routing events in step events, but we want
// them in target events below
if _, skip := targetRoutingEvents[event.Data.EventName]; skip {
ctx.Warnf("Found routing event '%s' with no target associated, this could indicate a bug", event.Data.EventName)
if _, skip := targetRoutingEvents[ev.Data.EventName]; skip {
ctx.Warnf("Found routing event '%s' with no target associated, this could indicate a bug", ev.Data.EventName)
continue
}
// this goes into TestStepStatus.Events
stepEvents = append(stepEvents, event)
stepEvents = append(stepEvents, ev)
} else {
// this goes into TargetStatus.Events
targetEvents = append(targetEvents, event)
targetEvents = append(targetEvents, ev)
}
}

Expand Down
175 changes: 131 additions & 44 deletions pkg/runner/test_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type stepState struct {
// Channels used to communicate with the plugin.
inCh chan *target.Target
outCh chan test.TestStepResult
ev testevent.Emitter
ev *eventDistributor

tgtDone map[*target.Target]bool // Targets for which results have been received.

Expand All @@ -85,19 +85,23 @@ const (
targetStepPhaseRun // (3) Injected into step.
targetStepPhaseResultPending // (4) Result posted to the handler.
targetStepPhaseEnd // (5) Finished running a step.
targetStepPhaseSleepRetry // (6) A step is sleeping waiting to be retried
)

// targetState contains state associated with one target progressing through the pipeline.
type targetState struct {
tgt *target.Target

// This part of state gets serialized into JSON for resumption.
CurStep int `json:"S,omitempty"` // Current step number.
CurPhase targetStepPhase `json:"P,omitempty"` // Current phase of step execution.
Res *xjson.Error `json:"R,omitempty"` // Final result, if reached the end state.
CurStep int `json:"S,omitempty"` // Current step number.
CurPhase targetStepPhase `json:"P,omitempty"` // Current phase of step execution.
Res *xjson.Error `json:"R,omitempty"` // Result of the last attempt or final, if reached the end state.
CurRetry int `json:"CR"` // Current retry number.
NextAttempt *time.Time `json:"NA,omitempty"` // Timestap for the next attempt to begin.

handlerRunning bool
resCh chan error // Channel used to communicate result by the step runner.
resChClosed bool
}

// resumeStateStruct is used to serialize runner state to be resumed in the future.
Expand Down Expand Up @@ -173,18 +177,13 @@ func (tr *TestRunner) run(
srs = rs.StepResumeState[i]
}
tr.steps = append(tr.steps, &stepState{
ctx: stepCtx,
cancel: stepCancel,
stepIndex: i,
sb: sb,
inCh: make(chan *target.Target),
outCh: make(chan test.TestStepResult),
ev: storage.NewTestEventEmitter(testevent.Header{
JobID: jobID,
RunID: runID,
TestName: t.Name,
TestStepLabel: sb.TestStepLabel,
}),
ctx: stepCtx,
cancel: stepCancel,
stepIndex: i,
sb: sb,
ev: newEventDistributor(jobID, runID, t.Name, sb.TestStepLabel),
inCh: make(chan *target.Target),
outCh: make(chan test.TestStepResult),
tgtDone: make(map[*target.Target]bool),
resumeState: srs,
})
Expand Down Expand Up @@ -404,13 +403,13 @@ func (tr *TestRunner) injectTarget(ctx xcontext.Context, tgs *targetState, ss *s
return err
}

func (tr *TestRunner) awaitTargetResult(ctx xcontext.Context, tgs *targetState, ss *stepState) error {
func (tr *TestRunner) awaitTargetResult(ctx xcontext.Context, tgs *targetState, ss *stepState) (error, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit; in order to differentiate, i used a type rename in other diffs;

type outcome error

select {
case res, ok := <-tgs.resCh:
if !ok {
// Channel is closed when job is paused to make sure all results are processed.
ctx.Debugf("%s: result channel closed", tgs)
return xcontext.ErrPaused
return nil, xcontext.ErrPaused
}
ctx.Debugf("%s: result recd for %s", tgs, ss)
var err error
Expand All @@ -422,14 +421,7 @@ func (tr *TestRunner) awaitTargetResult(ctx xcontext.Context, tgs *targetState,
if err != nil {
ctx.Errorf("failed to emit event: %s", err)
}
tr.mu.Lock()
if res != nil {
tgs.Res = xjson.NewError(res)
}
tgs.CurPhase = targetStepPhaseEnd
tr.mu.Unlock()
tr.cond.Signal()
return err
return res, err
// Check for cancellation.
// Notably we are not checking for the pause condition here:
// when paused, we want to let all the injected targets to finish
Expand All @@ -439,18 +431,21 @@ func (tr *TestRunner) awaitTargetResult(ctx xcontext.Context, tgs *targetState,
tr.mu.Lock()
ctx.Debugf("%s: canceled 2", tgs)
tr.mu.Unlock()
return xcontext.ErrCanceled
return nil, xcontext.ErrCanceled
}
}

// targetHandler takes a single target through each step of the pipeline in sequence.
// It injects the target, waits for the result, then moves on to the next step.
func (tr *TestRunner) targetHandler(ctx xcontext.Context, tgs *targetState) {
func (tr *TestRunner) targetHandler(
ctx xcontext.Context,
tgs *targetState,
) {
ctx = ctx.WithField("target", tgs.tgt.ID)
ctx.Debugf("%s: target handler active", tgs)
// NB: CurStep may be non-zero on entry if resumed
loop:
for i := tgs.CurStep; i < len(tr.steps); {
for stepIndex := tgs.CurStep; stepIndex < len(tr.steps); {
// Early check for pause or cancelation.
select {
case <-ctx.Until(xcontext.ErrPaused):
Expand All @@ -462,27 +457,45 @@ loop:
default:
}
tr.mu.Lock()
ss := tr.steps[i]
ss := tr.steps[stepIndex]
var inject bool
switch tgs.CurPhase {
case targetStepPhaseInit:
// Normal case, inject and wait for result.
inject = true
tgs.CurPhase = targetStepPhaseBegin
fallthrough
case targetStepPhaseBegin:
// Paused before injection.
inject = true
ss.ev.setRetry(tgs.tgt.ID, tgs.CurRetry)
case targetStepPhaseRun:
// Resumed in running state, skip injection.
inject = false
case targetStepPhaseSleepRetry:
sleepTime := time.Until(*tgs.NextAttempt)
ctx.Debugf("%s: sleep for %s before retrying", tgs, sleepTime)
tr.mu.Unlock()
select {
case <-time.After(sleepTime):
tr.mu.Lock()
tgs.CurPhase = targetStepPhaseBegin
ss.tgtDone[tgs.tgt] = false
tr.mu.Unlock()
continue loop
case <-ctx.Until(xcontext.ErrPaused):
case <-ctx.Done():
}
break loop
case targetStepPhaseEnd:
// Resumed in terminal state, we are done.
tr.mu.Unlock()
break loop
default:
ctx.Errorf("%s: invalid phase %s", tgs, tgs.CurPhase)
// Must be a job created by an incompatible version of the server, fail it.
ss.setErrLocked(fmt.Errorf("%s: invalid phase %s", tgs, tgs.CurPhase))
break loop
}

tr.mu.Unlock()
// Make sure we have a step runner active. If not, start one.
tr.runStepIfNeeded(ss)
Expand All @@ -493,8 +506,9 @@ loop:
}
// Await result. It will be communicated to us by the step runner
// and returned in tgs.res.
var res error
if err == nil {
err = tr.awaitTargetResult(ctx, tgs, ss)
res, err = tr.awaitTargetResult(ctx, tgs, ss)
}
tr.mu.Lock()
if err != nil {
Expand All @@ -510,16 +524,34 @@ loop:
tr.mu.Unlock()
break
}
if tgs.Res != nil {
tr.mu.Unlock()
break
if res != nil {
tgs.Res = xjson.NewError(res)
tgs.CurRetry++
if tgs.CurRetry > ss.sb.RetryParameters.NumRetries {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit; this seems more like a MaxRetries, rather than NumRetries

tgs.CurPhase = targetStepPhaseEnd
tr.mu.Unlock()
break
} else {
nextAttempt := time.Now()
if ri := ss.sb.RetryParameters.RetryInterval; ri != nil {
nextAttempt = nextAttempt.Add(time.Duration(*ri))
}
tgs.NextAttempt = &nextAttempt
tgs.CurPhase = targetStepPhaseSleepRetry
tr.mu.Unlock()
continue loop
}
} else {
tgs.Res = nil
}
i++
if i < len(tr.steps) {
tgs.CurStep = i
// Advance to the next step.
stepIndex++
if stepIndex < len(tr.steps) {
tgs.CurStep = stepIndex
tgs.CurPhase = targetStepPhaseInit
}
tr.mu.Unlock()
tr.cond.Signal()
}
tr.mu.Lock()
ctx.Debugf("%s: target handler finished", tgs)
Expand Down Expand Up @@ -722,9 +754,9 @@ func (tr *TestRunner) checkStepRunnersLocked() error {
case xcontext.ErrPaused:
// This is fine, just need to unblock target handlers waiting on result from this step.
for _, tgs := range tr.targets {
if tgs.resCh != nil && tgs.CurStep == i {
if !tgs.resChClosed && tgs.CurStep == i {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this extra bool needed, apart from the channel being null?

close(tgs.resCh)
tgs.resCh = nil
tgs.resChClosed = true
}
}
default:
Expand Down Expand Up @@ -769,7 +801,7 @@ stepLoop:
if !tgs.handlerRunning { // Not running anymore
continue
}
if tgs.CurStep < step || tgs.CurPhase < targetStepPhaseRun {
if tgs.CurStep <= step || tgs.CurPhase < targetStepPhaseRun {
ctx.Debugf("monitor pass %d: %s: not all targets injected yet (%s)", pass, ss, tgs)
ok = false
break
Expand Down Expand Up @@ -853,6 +885,8 @@ func (tph targetStepPhase) String() string {
return "run"
case targetStepPhaseResultPending:
return "result_pending"
case targetStepPhaseSleepRetry:
return "sleep_retry"
case targetStepPhaseEnd:
return "end"
}
Expand All @@ -875,6 +909,59 @@ func (tgs *targetState) String() string {
resText = "<nil>"
}
finished := !tgs.handlerRunning
return fmt.Sprintf("[%s %d %s %t %s]",
tgs.tgt, tgs.CurStep, tgs.CurPhase, finished, resText)
return fmt.Sprintf("[%s %d %s %d %t %s]",
tgs.tgt, tgs.CurStep, tgs.CurPhase, tgs.CurRetry, finished, resText)
}

// eventDistributor keeps track of retries in target state
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this name is kinda confusing for me. The main method is Emit but this object is also "keeping track of retries in target state"? Im not sure i understand the business logic here

type eventDistributor struct {
header testevent.Header
mu sync.Mutex
targetRetries map[string]int
}

func newEventDistributor(jobID types.JobID, runID types.RunID, testName string, testStepLabel string) *eventDistributor {
return &eventDistributor{
header: testevent.Header{
JobID: jobID,
RunID: runID,
TestName: testName,
TestStepLabel: testStepLabel,
},
targetRetries: make(map[string]int),
}
}

func (ed *eventDistributor) setRetry(targetID string, retry int) {
ed.mu.Lock()
defer ed.mu.Unlock()
ed.targetRetries[targetID] = retry
}

func (ed *eventDistributor) Emit(ctx xcontext.Context, data testevent.Data) error {
if data.Target == nil {
return ed.emitForUnknownTarget(ctx, data)
}

ed.mu.Lock()
retry, found := ed.targetRetries[data.Target.ID]
ed.mu.Unlock()

if !found {
// this should never happen
ctx.Errorf("Unknown target ID: '%s'", data.Target.ID)
return ed.emitForUnknownTarget(ctx, data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this should never happen, why does it still have en emit event handler (with invalid header because of the -1 retry) and not an error?

}

h := ed.header
h.Retry = retry
ev := testevent.NewWithCurrentEmitTime(&h, &data)
return storage.EmitTestEvent(ctx, ev)
}

func (ed *eventDistributor) emitForUnknownTarget(ctx xcontext.Context, data testevent.Data) error {
h := ed.header
h.Retry = -1
ev := testevent.NewWithCurrentEmitTime(&h, &data)
return storage.EmitTestEvent(ctx, ev)
}
Loading