From d2ab013257ef0f47ec813a9c69387eb02a69d583 Mon Sep 17 00:00:00 2001 From: Jared O'Connell <46976761+jaredoconnell@users.noreply.github.com> Date: Mon, 6 Nov 2023 11:33:14 -0500 Subject: [PATCH] Allow waiting for starting complete (#119) * Allow waiting for starting complete Did this by creating an output for starting * Fix linting errors --------- Co-authored-by: Dustin Black --- internal/step/plugin/provider.go | 38 ++++++++++++---- internal/step/plugin/provider_test.go | 2 +- workflow/executor.go | 4 +- workflow/workflow.go | 4 +- workflow/workflow_test.go | 64 +++++++++++++++++++++++++++ 5 files changed, 99 insertions(+), 13 deletions(-) diff --git a/internal/step/plugin/provider.go b/internal/step/plugin/provider.go index 8912e95b..29a21e1d 100644 --- a/internal/step/plugin/provider.go +++ b/internal/step/plugin/provider.go @@ -321,6 +321,19 @@ func (r *runnableStep) RunSchema() map[string]*schema.PropertySchema { } } +func (r *runnableStep) StartedSchema() *schema.StepOutputSchema { + return schema.NewStepOutputSchema( + schema.NewScopeSchema( + schema.NewObjectSchema( + "StartedOutput", + map[string]*schema.PropertySchema{}, + ), + ), + nil, + false, + ) +} + func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[step.LifecycleStageWithSchema], err error) { rawStepID, ok := input["step"] if !ok || rawStepID == nil { @@ -447,6 +460,9 @@ func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[st nil, ), }, + Outputs: map[string]*schema.StepOutputSchema{ + "started": r.StartedSchema(), + }, }, { LifecycleStage: runningLifecycleStage, @@ -954,7 +970,8 @@ func (r *runningStep) startStage(container deployer.Plugin) error { func (r *runningStep) runStage() error { r.logger.Debugf("Running stage for step %s/%s", r.runID, r.pluginStepID) - r.transitionStage(StageIDRunning, step.RunningStepStateRunning) + startedOutput := any(map[any]any{}) + r.transitionStageWithOutput(StageIDRunning, step.RunningStepStateRunning, schema.PointerTo("started"), &startedOutput) var result atp.ExecutionResult select { @@ -981,7 +998,7 @@ func (r *runningStep) runStage() error { // Execution complete, move to state running stage outputs, then to state finished stage. r.transitionStage(StageIDOutput, step.RunningStepStateRunning) - r.completeStage(r.currentStage, step.RunningStepStateFinished, &result.OutputID, &result.OutputData) + r.completeStep(r.currentStage, step.RunningStepStateFinished, &result.OutputID, &result.OutputData) return nil } @@ -996,7 +1013,7 @@ func (r *runningStep) deployFailed(err error) { output := any(DeployFailed{ Error: err.Error(), }) - r.completeStage(StageIDDeployFailed, step.RunningStepStateFinished, &outputID, &output) + r.completeStep(StageIDDeployFailed, step.RunningStepStateFinished, &outputID, &output) } func (r *runningStep) startFailed(err error) { @@ -1010,7 +1027,7 @@ func (r *runningStep) startFailed(err error) { Output: err.Error(), }) - r.completeStage(StageIDCrashed, step.RunningStepStateFinished, &outputID, &output) + r.completeStep(StageIDCrashed, step.RunningStepStateFinished, &outputID, &output) } func (r *runningStep) runFailed(err error) { @@ -1023,13 +1040,18 @@ func (r *runningStep) runFailed(err error) { output := any(Crashed{ Output: err.Error(), }) - r.completeStage(StageIDCrashed, step.RunningStepStateFinished, &outputID, &output) + r.completeStep(StageIDCrashed, step.RunningStepStateFinished, &outputID, &output) } // TransitionStage transitions the stage to the specified stage, and the state to the specified state. // //nolint:unparam func (r *runningStep) transitionStage(newStage StageID, state step.RunningStepState) { + r.transitionStageWithOutput(newStage, state, nil, nil) +} + +// TransitionStage transitions the stage to the specified stage, and the state to the specified state. +func (r *runningStep) transitionStageWithOutput(newStage StageID, state step.RunningStepState, outputID *string, previousStageOutput *any) { // A current lack of observability into the atp client prevents // non-fragile testing of this function. r.lock.Lock() @@ -1042,8 +1064,8 @@ func (r *runningStep) transitionStage(newStage StageID, state step.RunningStepSt r.stageChangeHandler.OnStageChange( r, &previousStage, - nil, - nil, + outputID, + previousStageOutput, string(newStage), false, &r.wg, @@ -1051,7 +1073,7 @@ func (r *runningStep) transitionStage(newStage StageID, state step.RunningStepSt } //nolint:unparam -func (r *runningStep) completeStage(currentStage StageID, state step.RunningStepState, outputID *string, previousStageOutput *any) { +func (r *runningStep) completeStep(currentStage StageID, state step.RunningStepState, outputID *string, previousStageOutput *any) { r.lock.Lock() previousStage := string(r.currentStage) r.currentStage = currentStage diff --git a/internal/step/plugin/provider_test.go b/internal/step/plugin/provider_test.go index 2f96a2e4..6858b002 100644 --- a/internal/step/plugin/provider_test.go +++ b/internal/step/plugin/provider_test.go @@ -399,7 +399,7 @@ func TestProvider_VerifyCancelSignal(t *testing.T) { waitStageIDCancelled := waitLifecycle.Stages[waitCancelledStageIDIndex] waitStopIfSchema := assert.MapContainsKey(t, "stop_if", waitStageIDCancelled.InputSchema) if waitStopIfSchema.Disabled { - t.Fatalf("step wait's wait_for schema is disabled when the cancel signal is present.") + t.Fatalf("step wait's stop_if schema is disabled when the cancel signal is present.") } helloLifecycle, err := runnable.Lifecycle(map[string]any{"step": "hello"}) diff --git a/workflow/executor.go b/workflow/executor.go index f1a4add4..a6ddaeaf 100644 --- a/workflow/executor.go +++ b/workflow/executor.go @@ -463,7 +463,7 @@ func (e *executor) createTypeStructure(rootSchema schema.Scope, inputField any, value := v.Index(i).Interface() newValue, err := e.createTypeStructure(rootSchema, value, workflowContext) if err != nil { - return nil, fmt.Errorf("failed to resolve expressions (%w)", err) + return nil, fmt.Errorf("failed to resolve slice expressions (%w)", err) } result[i] = newValue } @@ -479,7 +479,7 @@ func (e *executor) createTypeStructure(rootSchema schema.Scope, inputField any, value := v.MapIndex(reflectedKey).Interface() newValue, err := e.createTypeStructure(rootSchema, value, workflowContext) if err != nil { - return nil, fmt.Errorf("failed to resolve expressions (%w)", err) + return nil, fmt.Errorf("failed to resolve map expressions (%w)", err) } result[keyAsStr] = newValue } diff --git a/workflow/workflow.go b/workflow/workflow.go index 9ffab54d..379f649f 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -524,7 +524,7 @@ func (l *loopState) resolveExpressions(inputData any, dataModel any) (any, error value := v.Index(i).Interface() newValue, err := l.resolveExpressions(value, dataModel) if err != nil { - return nil, fmt.Errorf("failed to resolve expressions (%w)", err) + return nil, fmt.Errorf("failed to resolve workflow slice expressions (%w)", err) } result[i] = newValue } @@ -536,7 +536,7 @@ func (l *loopState) resolveExpressions(inputData any, dataModel any) (any, error value := v.MapIndex(reflectedKey).Interface() newValue, err := l.resolveExpressions(value, dataModel) if err != nil { - return nil, fmt.Errorf("failed to resolve expressions (%w)", err) + return nil, fmt.Errorf("failed to resolve workflow map expressions (%w)", err) } result[key] = newValue } diff --git a/workflow/workflow_test.go b/workflow/workflow_test.go index c2bd6bc4..330b7ed0 100644 --- a/workflow/workflow_test.go +++ b/workflow/workflow_test.go @@ -346,6 +346,70 @@ func TestWaitForSerial(t *testing.T) { } } +var waitForStartedWorkflowDefinition = ` +version: v0.2.0 +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: {} +steps: + pre_wait: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: 2 + first_wait: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: 2 + wait_for: !expr $.steps.pre_wait.outputs + second_wait: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: 2 + wait_for: !expr $.steps.first_wait.starting.started +outputs: + success: + #first_step_output: !expr $.steps.first_wait.outputs + second_step_output: !expr $.steps.second_wait.outputs +` + +func TestWaitForStarted(t *testing.T) { + // For this test, the second step is depending on a step's running state, which is not a finished output node. + logConfig := log.Config{ + Level: log.LevelDebug, + Destination: log.DestinationStdout, + } + logger := log.New( + logConfig, + ) + cfg := &config.Config{ + Log: logConfig, + } + stepRegistry := NewTestImplStepRegistry(logger, t) + + executor := lang.Must2(workflow.NewExecutor( + logger, + cfg, + stepRegistry, + )) + wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(waitForStartedWorkflowDefinition))) + preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{})) + outputID, _, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) + assert.NoError(t, err) + assert.Equals(t, outputID, "success") +} + var missingInputsFailedDeploymentWorkflowDefinition = ` version: v0.2.0 input: