Skip to content

Commit

Permalink
Allow waiting for starting complete (#119)
Browse files Browse the repository at this point in the history
* Allow waiting for starting complete

Did this by creating an output for starting

* Fix linting errors

---------

Co-authored-by: Dustin Black <[email protected]>
  • Loading branch information
jaredoconnell and dustinblack committed Nov 6, 2023
1 parent 877786d commit d2ab013
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 13 deletions.
38 changes: 30 additions & 8 deletions internal/step/plugin/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,19 @@ func (r *runnableStep) RunSchema() map[string]*schema.PropertySchema {
}
}

func (r *runnableStep) StartedSchema() *schema.StepOutputSchema {
return schema.NewStepOutputSchema(
schema.NewScopeSchema(
schema.NewObjectSchema(
"StartedOutput",
map[string]*schema.PropertySchema{},
),
),
nil,
false,
)
}

func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[step.LifecycleStageWithSchema], err error) {
rawStepID, ok := input["step"]
if !ok || rawStepID == nil {
Expand Down Expand Up @@ -447,6 +460,9 @@ func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[st
nil,
),
},
Outputs: map[string]*schema.StepOutputSchema{
"started": r.StartedSchema(),
},
},
{
LifecycleStage: runningLifecycleStage,
Expand Down Expand Up @@ -954,7 +970,8 @@ func (r *runningStep) startStage(container deployer.Plugin) error {

func (r *runningStep) runStage() error {
r.logger.Debugf("Running stage for step %s/%s", r.runID, r.pluginStepID)
r.transitionStage(StageIDRunning, step.RunningStepStateRunning)
startedOutput := any(map[any]any{})
r.transitionStageWithOutput(StageIDRunning, step.RunningStepStateRunning, schema.PointerTo("started"), &startedOutput)

var result atp.ExecutionResult
select {
Expand All @@ -981,7 +998,7 @@ func (r *runningStep) runStage() error {

// Execution complete, move to state running stage outputs, then to state finished stage.
r.transitionStage(StageIDOutput, step.RunningStepStateRunning)
r.completeStage(r.currentStage, step.RunningStepStateFinished, &result.OutputID, &result.OutputData)
r.completeStep(r.currentStage, step.RunningStepStateFinished, &result.OutputID, &result.OutputData)

return nil
}
Expand All @@ -996,7 +1013,7 @@ func (r *runningStep) deployFailed(err error) {
output := any(DeployFailed{
Error: err.Error(),
})
r.completeStage(StageIDDeployFailed, step.RunningStepStateFinished, &outputID, &output)
r.completeStep(StageIDDeployFailed, step.RunningStepStateFinished, &outputID, &output)
}

func (r *runningStep) startFailed(err error) {
Expand All @@ -1010,7 +1027,7 @@ func (r *runningStep) startFailed(err error) {
Output: err.Error(),
})

r.completeStage(StageIDCrashed, step.RunningStepStateFinished, &outputID, &output)
r.completeStep(StageIDCrashed, step.RunningStepStateFinished, &outputID, &output)
}

func (r *runningStep) runFailed(err error) {
Expand All @@ -1023,13 +1040,18 @@ func (r *runningStep) runFailed(err error) {
output := any(Crashed{
Output: err.Error(),
})
r.completeStage(StageIDCrashed, step.RunningStepStateFinished, &outputID, &output)
r.completeStep(StageIDCrashed, step.RunningStepStateFinished, &outputID, &output)
}

// TransitionStage transitions the stage to the specified stage, and the state to the specified state.
//
//nolint:unparam
func (r *runningStep) transitionStage(newStage StageID, state step.RunningStepState) {
r.transitionStageWithOutput(newStage, state, nil, nil)
}

// TransitionStage transitions the stage to the specified stage, and the state to the specified state.
func (r *runningStep) transitionStageWithOutput(newStage StageID, state step.RunningStepState, outputID *string, previousStageOutput *any) {
// A current lack of observability into the atp client prevents
// non-fragile testing of this function.
r.lock.Lock()
Expand All @@ -1042,16 +1064,16 @@ func (r *runningStep) transitionStage(newStage StageID, state step.RunningStepSt
r.stageChangeHandler.OnStageChange(
r,
&previousStage,
nil,
nil,
outputID,
previousStageOutput,
string(newStage),
false,
&r.wg,
)
}

//nolint:unparam
func (r *runningStep) completeStage(currentStage StageID, state step.RunningStepState, outputID *string, previousStageOutput *any) {
func (r *runningStep) completeStep(currentStage StageID, state step.RunningStepState, outputID *string, previousStageOutput *any) {
r.lock.Lock()
previousStage := string(r.currentStage)
r.currentStage = currentStage
Expand Down
2 changes: 1 addition & 1 deletion internal/step/plugin/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func TestProvider_VerifyCancelSignal(t *testing.T) {
waitStageIDCancelled := waitLifecycle.Stages[waitCancelledStageIDIndex]
waitStopIfSchema := assert.MapContainsKey(t, "stop_if", waitStageIDCancelled.InputSchema)
if waitStopIfSchema.Disabled {
t.Fatalf("step wait's wait_for schema is disabled when the cancel signal is present.")
t.Fatalf("step wait's stop_if schema is disabled when the cancel signal is present.")
}

helloLifecycle, err := runnable.Lifecycle(map[string]any{"step": "hello"})
Expand Down
4 changes: 2 additions & 2 deletions workflow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ func (e *executor) createTypeStructure(rootSchema schema.Scope, inputField any,
value := v.Index(i).Interface()
newValue, err := e.createTypeStructure(rootSchema, value, workflowContext)
if err != nil {
return nil, fmt.Errorf("failed to resolve expressions (%w)", err)
return nil, fmt.Errorf("failed to resolve slice expressions (%w)", err)
}
result[i] = newValue
}
Expand All @@ -479,7 +479,7 @@ func (e *executor) createTypeStructure(rootSchema schema.Scope, inputField any,
value := v.MapIndex(reflectedKey).Interface()
newValue, err := e.createTypeStructure(rootSchema, value, workflowContext)
if err != nil {
return nil, fmt.Errorf("failed to resolve expressions (%w)", err)
return nil, fmt.Errorf("failed to resolve map expressions (%w)", err)
}
result[keyAsStr] = newValue
}
Expand Down
4 changes: 2 additions & 2 deletions workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func (l *loopState) resolveExpressions(inputData any, dataModel any) (any, error
value := v.Index(i).Interface()
newValue, err := l.resolveExpressions(value, dataModel)
if err != nil {
return nil, fmt.Errorf("failed to resolve expressions (%w)", err)
return nil, fmt.Errorf("failed to resolve workflow slice expressions (%w)", err)
}
result[i] = newValue
}
Expand All @@ -536,7 +536,7 @@ func (l *loopState) resolveExpressions(inputData any, dataModel any) (any, error
value := v.MapIndex(reflectedKey).Interface()
newValue, err := l.resolveExpressions(value, dataModel)
if err != nil {
return nil, fmt.Errorf("failed to resolve expressions (%w)", err)
return nil, fmt.Errorf("failed to resolve workflow map expressions (%w)", err)
}
result[key] = newValue
}
Expand Down
64 changes: 64 additions & 0 deletions workflow/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,70 @@ func TestWaitForSerial(t *testing.T) {
}
}

var waitForStartedWorkflowDefinition = `
version: v0.2.0
input:
root: RootObject
objects:
RootObject:
id: RootObject
properties: {}
steps:
pre_wait:
plugin:
src: "n/a"
deployment_type: "builtin"
step: wait
input:
wait_time_ms: 2
first_wait:
plugin:
src: "n/a"
deployment_type: "builtin"
step: wait
input:
wait_time_ms: 2
wait_for: !expr $.steps.pre_wait.outputs
second_wait:
plugin:
src: "n/a"
deployment_type: "builtin"
step: wait
input:
wait_time_ms: 2
wait_for: !expr $.steps.first_wait.starting.started
outputs:
success:
#first_step_output: !expr $.steps.first_wait.outputs
second_step_output: !expr $.steps.second_wait.outputs
`

func TestWaitForStarted(t *testing.T) {
// For this test, the second step is depending on a step's running state, which is not a finished output node.
logConfig := log.Config{
Level: log.LevelDebug,
Destination: log.DestinationStdout,
}
logger := log.New(
logConfig,
)
cfg := &config.Config{
Log: logConfig,
}
stepRegistry := NewTestImplStepRegistry(logger, t)

executor := lang.Must2(workflow.NewExecutor(
logger,
cfg,
stepRegistry,
))
wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(waitForStartedWorkflowDefinition)))
preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{}))
outputID, _, err := preparedWorkflow.Execute(context.Background(), map[string]any{})
assert.NoError(t, err)
assert.Equals(t, outputID, "success")
}

var missingInputsFailedDeploymentWorkflowDefinition = `
version: v0.2.0
input:
Expand Down

0 comments on commit d2ab013

Please sign in to comment.