diff --git a/internal/step/plugin/provider.go b/internal/step/plugin/provider.go index 10447053..23ad9798 100644 --- a/internal/step/plugin/provider.go +++ b/internal/step/plugin/provider.go @@ -147,6 +147,11 @@ const ( StageIDDeployFailed StageID = "deploy_failed" // StageIDRunning is a stage that indicates that a plugin is now working. StageIDRunning StageID = "running" + // StageIDEnabling is a stage that indicates that the plugin is waiting to be enabled. + // This is required to be separate to ensure that it exits immediately if disabled. + StageIDEnabling StageID = "enabling" + // StageIDDisabled is a stage that indicates that the plugin's step was disabled. + StageIDDisabled StageID = "disabled" // StageIDCancelled is a stage that indicates that the plugin's step was cancelled. StageIDCancelled StageID = "cancelled" // StageIDOutput is a stage that indicates that the plugin has completed working successfully. @@ -176,6 +181,19 @@ var deployFailedLifecycleStage = step.LifecycleStage{ Fatal: true, } +var enablingLifecycleStage = step.LifecycleStage{ + ID: string(StageIDEnabling), + WaitingName: "waiting to be enabled", + RunningName: "enabling", + FinishedName: "enablement determined", + InputFields: map[string]struct{}{ + "enabled": {}, + }, + NextStages: []string{ + string(StageIDStarting), string(StageIDDisabled), string(StageIDCrashed), + }, +} + var startingLifecycleStage = step.LifecycleStage{ ID: string(StageIDStarting), WaitingName: "waiting to start", @@ -213,6 +231,15 @@ var cancelledLifecycleStage = step.LifecycleStage{ string(StageIDOutput), string(StageIDCrashed), string(StageIDDeployFailed), }, } + +var disabledLifecycleStage = step.LifecycleStage{ + ID: string(StageIDDisabled), + WaitingName: "waiting for the step to be disabled", + RunningName: "disabling", + FinishedName: "disabled", + InputFields: map[string]struct{}{}, +} + var finishedLifecycleStage = step.LifecycleStage{ ID: string(StageIDOutput), WaitingName: "finished", @@ -233,9 +260,11 @@ func (p *pluginProvider) Lifecycle() step.Lifecycle[step.LifecycleStage] { Stages: []step.LifecycleStage{ deployingLifecycleStage, deployFailedLifecycleStage, + enablingLifecycleStage, startingLifecycleStage, runningLifecycleStage, cancelledLifecycleStage, + disabledLifecycleStage, finishedLifecycleStage, crashedLifecycleStage, }, @@ -338,6 +367,60 @@ func (r *runnableStep) StartedSchema() *schema.StepOutputSchema { ) } +func (r *runnableStep) EnabledOutputSchema() *schema.StepOutputSchema { + return schema.NewStepOutputSchema( + schema.NewScopeSchema( + schema.NewObjectSchema( + "EnabledOutput", + map[string]*schema.PropertySchema{ + "enabled": schema.NewPropertySchema( + schema.NewBoolSchema(), + schema.NewDisplayValue( + schema.PointerTo("enabled"), + schema.PointerTo("Whether the step was enabled"), + nil), + true, + nil, + nil, + nil, + nil, + nil, + ), + }, + ), + ), + nil, + false, + ) +} + +func (r *runnableStep) DisabledOutputSchema() *schema.StepOutputSchema { + return schema.NewStepOutputSchema( + schema.NewScopeSchema( + schema.NewObjectSchema( + "DisabledMessageOutput", + map[string]*schema.PropertySchema{ + "message": schema.NewPropertySchema( + schema.NewStringSchema(nil, nil, nil), + schema.NewDisplayValue( + schema.PointerTo("message"), + schema.PointerTo("A human readable message stating that the step was disabled."), + nil), + true, + nil, + nil, + nil, + nil, + nil, + ), + }, + ), + ), + nil, + false, + ) +} + func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[step.LifecycleStageWithSchema], err error) { rawStepID, ok := input["step"] if !ok || rawStepID == nil { @@ -436,6 +519,28 @@ func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[st }, }, }, + { + LifecycleStage: enablingLifecycleStage, + InputSchema: map[string]*schema.PropertySchema{ + "enabled": schema.NewPropertySchema( + schema.NewBoolSchema(), + schema.NewDisplayValue( + schema.PointerTo("Enabled"), + schema.PointerTo("Used to set whether the step is enabled."), + nil, + ), + false, + nil, + nil, + nil, + nil, + nil, + ), + }, + Outputs: map[string]*schema.StepOutputSchema{ + "resolved": r.EnabledOutputSchema(), + }, + }, { LifecycleStage: startingLifecycleStage, InputSchema: map[string]*schema.PropertySchema{ @@ -480,6 +585,13 @@ func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[st }, Outputs: nil, }, + { + LifecycleStage: disabledLifecycleStage, + InputSchema: nil, + Outputs: map[string]*schema.StepOutputSchema{ + "output": r.DisabledOutputSchema(), + }, + }, { LifecycleStage: finishedLifecycleStage, InputSchema: nil, @@ -563,6 +675,7 @@ func (r *runnableStep) Start(input map[string]any, runID string, stageChangeHand cancel: cancel, deployInput: make(chan any, 1), runInput: make(chan any, 1), + enabledInput: make(chan bool, 1), logger: r.logger, deploymentType: r.deploymentType, source: r.source, @@ -581,32 +694,34 @@ func (r *runnableStep) Start(input map[string]any, runID string, stageChangeHand } type runningStep struct { - deployerRegistry registry.Registry - stepSchema schema.Step - stageChangeHandler step.StageChangeHandler - lock *sync.Mutex - wg sync.WaitGroup - ctx context.Context - cancel context.CancelFunc - atpClient atp.Client - deployInput chan any - deployInputAvailable bool - runInput chan any - runInputAvailable bool - logger log.Logger - currentStage StageID - runID string // The ID associated with this execution (the workflow step ID) - deploymentType deployer.DeploymentType - source string - pluginStepID string // The ID of the step in the plugin - state step.RunningStepState - useLocalDeployer bool - localDeployer deployer.Connector - container deployer.Plugin - executionChannel chan atp.ExecutionResult - signalToStep chan schema.Input // Communicates with the ATP client, not other steps. - signalFromStep chan schema.Input // Communicates with the ATP client, not other steps. - closed bool + deployerRegistry registry.Registry + stepSchema schema.Step + stageChangeHandler step.StageChangeHandler + lock *sync.Mutex + wg sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + atpClient atp.Client + deployInput chan any + deployInputAvailable bool + enabledInput chan bool + enabledInputAvailable bool + runInput chan any + runInputAvailable bool + logger log.Logger + currentStage StageID + runID string // The ID associated with this execution (the workflow step ID) + deploymentType deployer.DeploymentType + source string + pluginStepID string // The ID of the step in the plugin + state step.RunningStepState + useLocalDeployer bool + localDeployer deployer.Connector + container deployer.Plugin + executionChannel chan atp.ExecutionResult + signalToStep chan schema.Input // Communicates with the ATP client, not other steps. + signalFromStep chan schema.Input // Communicates with the ATP client, not other steps. + closed bool // Store channels for sending pre-calculated signal outputs to other steps? // Store channels for receiving pre-calculated signal inputs from other steps? } @@ -640,6 +755,8 @@ func (r *runningStep) ProvideStageInput(stage string, input map[string]any) erro return r.provideDeployInput(input) case string(StageIDStarting): return r.provideStartingInput(input) + case string(StageIDEnabling): + return r.provideEnablingInput(input) case string(StageIDRunning): return nil case string(StageIDCancelled): @@ -688,10 +805,23 @@ func (r *runningStep) provideDeployInput(input map[string]any) error { return nil } +func (r *runningStep) provideEnablingInput(input map[string]any) error { + // Note: The calling function must have the step mutex locked + if r.enabledInputAvailable { + return fmt.Errorf("enabled input provided more than once") + } + // Check to make sure it's enabled. + // This is an optional field, so no input means enabled. + enabled := input["enabled"] == nil || input["enabled"] == true + r.enabledInputAvailable = true + r.enabledInput <- enabled + return nil +} + func (r *runningStep) provideStartingInput(input map[string]any) error { // Note: The calling function must have the step mutex locked if r.runInputAvailable { - return fmt.Errorf("input provided more than once") + return fmt.Errorf("starting input provided more than once") } // Ensure input is given if input["input"] == nil { @@ -817,6 +947,17 @@ func (r *runningStep) run() { } r.lock.Unlock() r.logger.Debugf("Successfully deployed container with ID '%s' for step %s/%s", container.ID(), r.runID, r.pluginStepID) + + r.logger.Debugf("Checking to see if step %s/%s is enabled", r.runID, r.pluginStepID) + enabled, err := r.enableStage() + if err != nil { + r.startFailed(err) + } + r.logger.Debugf("Step %s/%s enablement state: %t", r.runID, r.pluginStepID, enabled) + if !enabled { + r.transitionToDisabled() + return + } if err := r.startStage(container); err != nil { r.startFailed(err) return @@ -885,26 +1026,13 @@ func (r *runningStep) deployStage() (deployer.Plugin, error) { return container, nil } -func (r *runningStep) startStage(container deployer.Plugin) error { - r.logger.Debugf("Starting stage for step %s/%s", r.runID, r.pluginStepID) - atpClient := atp.NewClientWithLogger(container, r.logger) +// enableStage returns the result of whether the stage was enabled or not. +func (r *runningStep) enableStage() (bool, error) { r.lock.Lock() previousStage := string(r.currentStage) - r.currentStage = StageIDStarting - inputRecievedEarly := false - r.atpClient = atpClient - - var runInput any - select { - case runInput = <-r.runInput: - // Good. It received it immediately. - r.state = step.RunningStepStateRunning - inputRecievedEarly = true - default: // The default makes it not wait. - r.state = step.RunningStepStateWaitingForInput - } - - runInputAvailable := r.runInputAvailable + r.currentStage = StageIDEnabling + enabledInputAvailable := r.enabledInputAvailable + r.state = step.RunningStepStateWaitingForInput r.lock.Unlock() r.stageChangeHandler.OnStageChange( @@ -912,20 +1040,52 @@ func (r *runningStep) startStage(container deployer.Plugin) error { &previousStage, nil, nil, - string(StageIDStarting), - runInputAvailable, + string(StageIDEnabling), + enabledInputAvailable, &r.wg, ) + select { + case enabled := <-r.enabledInput: + return enabled, nil + case <-r.ctx.Done(): + return false, fmt.Errorf("step closed while determining enablement status") + } +} + +func (r *runningStep) startStage(container deployer.Plugin) error { + r.logger.Debugf("Starting stage for step %s/%s", r.runID, r.pluginStepID) + atpClient := atp.NewClientWithLogger(container, r.logger) + var inputReceivedEarly bool r.lock.Lock() - r.currentStage = StageIDStarting - r.logger.Debugf("Waiting for input state while starting 2.") + r.atpClient = atpClient r.lock.Unlock() + var runInput any + var newState step.RunningStepState + select { + case runInput = <-r.runInput: + // Good. It received it immediately. + newState = step.RunningStepStateRunning + inputReceivedEarly = true + default: // The default makes it not wait. + newState = step.RunningStepStateWaitingForInput + inputReceivedEarly = false + } + + enabledOutput := any(map[any]any{"enabled": true}) + // End Enabling with resolved output, and start starting + r.transitionStageWithOutput( + StageIDStarting, + newState, + schema.PointerTo("resolved"), + &enabledOutput, + ) + // First, try to non-blocking retrieve the runInput. // If not yet available, set to state waiting for input and do a blocking receive. // If it is available, continue. - if !inputRecievedEarly { + if !inputReceivedEarly { // Input is not yet available. Now waiting. r.lock.Lock() if r.state != step.RunningStepStateWaitingForInput { @@ -1030,6 +1190,25 @@ func (r *runningStep) transitionToCancelled() { r.transitionStage(StageIDCancelled, step.RunningStepStateFinished) } +func (r *runningStep) transitionToDisabled() { + r.logger.Infof("Step %s/%s disabled", r.runID, r.pluginStepID) + enabledOutput := any(map[any]any{"enabled": false}) + // End prior stage "enabling" with "resolved" output, and start "disabled" stage. + r.transitionStageWithOutput( + StageIDDisabled, + step.RunningStepStateRunning, + schema.PointerTo("resolved"), + &enabledOutput, + ) + disabledOutput := any(map[any]any{"message": fmt.Sprintf("Step %s/%s disabled", r.runID, r.pluginStepID)}) + r.completeStep( + StageIDDisabled, + step.RunningStepStateFinished, // Must set the stage to finished for the engine realize the step is done. + schema.PointerTo("output"), + &disabledOutput, + ) +} + func (r *runningStep) startFailed(err error) { r.logger.Debugf("Start failed stage for step %s/%s", r.runID, r.pluginStepID) r.transitionStage(StageIDCrashed, step.RunningStepStateRunning) diff --git a/internal/step/plugin/provider_test.go b/internal/step/plugin/provider_test.go index e695355f..84a523e0 100644 --- a/internal/step/plugin/provider_test.go +++ b/internal/step/plugin/provider_test.go @@ -250,6 +250,13 @@ func TestProvider_HappyError(t *testing.T) { "deploy_time": "abc"}}, )) + // Enable the step + assert.NoError(t, running.ProvideStageInput( + string(plugin.StageIDEnabling), + map[string]any{}, + )) + + // Start with valid input assert.NoError(t, running.ProvideStageInput( string(plugin.StageIDDeploy), map[string]any{"deploy": map[string]any{ @@ -485,6 +492,11 @@ func TestProvider_StartFail(t *testing.T) { "wait_time_ms": 50}}, )) + assert.NoError(t, running.ProvideStageInput( + string(plugin.StageIDEnabling), + map[string]any{}, + )) + // wait for message, but we don't care about its value <-handler.message diff --git a/workflow/workflow_test.go b/workflow/workflow_test.go index c12d45a4..657b91e2 100644 --- a/workflow/workflow_test.go +++ b/workflow/workflow_test.go @@ -729,13 +729,13 @@ steps: wait_time_ms: 0 deploy: deployer_name: "test-impl" - #deploy_time: 20000 # 10 ms - deploy_succeed: false + deploy_succeed: false # This step will fail due to this. wait_2: plugin: src: "n/a" deployment_type: "builtin" step: wait + # This step waits for the failing step here. wait_for: !expr $.steps.wait_1.outputs.success input: wait_time_ms: 0 @@ -1213,6 +1213,121 @@ func TestInputCancelledStepWorkflow(t *testing.T) { assert.Contains(t, err.Error(), "cannot construct any output") } +var inputDisabledStepWorkflow = ` +version: v0.2.0 +input: + root: WorkflowInput + objects: + WorkflowInput: + id: WorkflowInput + properties: + step_enabled: + type: + type_id: bool +steps: + simple_wait: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: 20 + enabled: !expr $.input.step_enabled +outputs: + success: + simple_wait_output: !expr $.steps.simple_wait.outputs.success +` + +func TestInputDisabledStepWorkflow(t *testing.T) { + // Run a workflow with one step that has its enablement state + // set by the input. The output depends on successful output + // of the step that can be disabled, so this test case also + // tests that the failure caused when it's disabled doesn't + // lead to a deadlock. + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getTestImplPreparedWorkflow(t, inputDisabledStepWorkflow), + ) + // The workflow should pass with it enabled + outputID, _, err := preparedWorkflow.Execute(context.Background(), map[string]any{ + "step_enabled": true, + }) + assert.NoError(t, err) + assert.Equals(t, outputID, "success") + // The workflow should fail with it disabled because the output cannot be resolved. + _, _, err = preparedWorkflow.Execute(context.Background(), map[string]any{ + "step_enabled": false, + }) + assert.Error(t, err) + assert.Contains(t, err.Error(), "cannot construct any output") +} + +var dynamicDisabledStepWorkflow = ` +version: v0.2.0 +input: + root: WorkflowInput + objects: + WorkflowInput: + id: WorkflowInput + properties: + sleep_time: + type: + type_id: integer +steps: + initial_wait: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: !expr $.input.sleep_time # ms + toggled_wait: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: 0 + enabled: !expr $.steps.initial_wait.outputs.success.message == "Plugin slept for 20 ms." +outputs: + success: + initial_wait_output: !expr $.steps.initial_wait.outputs.success + toggled_wait_output: !expr $.steps.toggled_wait.outputs.success + disabled: + initial_wait_output: !expr $.steps.initial_wait.outputs.success + toggled_wait_output: !expr $.steps.toggled_wait.disabled.output +` + +func TestDelayedDisabledStepWorkflow(t *testing.T) { + // Run a workflow where the step is disabled by a value that isn't available + // at the start of the workflow; in this case the step is disabled from + // another step's output. + // This workflow has an output for success and an output for disabled. + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getTestImplPreparedWorkflow(t, dynamicDisabledStepWorkflow), + ) + // The second step expects a 20ms sleep/wait. + // Pass with a 20ms input. + outputID, _, err := preparedWorkflow.Execute(context.Background(), map[string]any{ + "sleep_time": 20, + }) + assert.NoError(t, err) + assert.Equals(t, outputID, "success") + // Fail with a non-20ms input. + outputID, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{ + "sleep_time": 19, + }) + assert.NoError(t, err) + assert.Equals(t, outputID, "disabled") + assert.InstanceOf[map[any]any](t, outputData) + outputMap := outputData.(map[any]any) + assert.MapContainsKey(t, "toggled_wait_output", outputMap) + toggledOutput := outputMap["toggled_wait_output"] + assert.InstanceOf[map[any]any](t, toggledOutput) + toggledOutputMap := toggledOutput.(map[any]any) + assert.MapContainsKey(t, "message", toggledOutputMap) + assert.Equals(t, toggledOutputMap["message"], "Step toggled_wait/wait disabled") +} + func createTestExecutableWorkflow(t *testing.T, workflowStr string, workflowCtx map[string][]byte) (workflow.ExecutableWorkflow, error) { logConfig := log.Config{ Level: log.LevelDebug,