diff --git a/internal/step/plugin/provider.go b/internal/step/plugin/provider.go index 446cf287..855305d7 100644 --- a/internal/step/plugin/provider.go +++ b/internal/step/plugin/provider.go @@ -14,7 +14,15 @@ import ( "go.flow.arcalot.io/pluginsdk/schema" ) +const errorStr = "error" + // New creates a new plugin provider. +// deployerRegistry The registry that contains all possible deployers. +// localDeployerConfig The section of the workflow config that pertains to all of the +// +// deployers. Most importantly it specifies which deployer is used for this +// deployment with the 'type' key. +// For more info, see `config/schema.go` func New(logger log.Logger, deployerRegistry registry.Registry, localDeployerConfig any) (step.Provider, error) { unserializedLocalDeployerConfig, err := deployerRegistry.Schema().Unserialize(localDeployerConfig) if err != nil { @@ -85,6 +93,8 @@ const ( StageIDOutput StageID = "outputs" // StageIDCrashed is a stage that indicates that the plugin has quit unexpectedly. StageIDCrashed StageID = "crashed" + // StageIDStarting is a stage that indicates that the plugin execution has begun. + StageIDStarting StageID = "starting" ) var deployingLifecycleStage = step.LifecycleStage{ @@ -94,7 +104,7 @@ var deployingLifecycleStage = step.LifecycleStage{ FinishedName: "deployed", InputFields: map[string]struct{}{string(StageIDDeploy): {}}, NextStages: []string{ - string(StageIDRunning), string(StageIDDeployFailed), + string(StageIDStarting), string(StageIDDeployFailed), }, Fatal: false, } @@ -105,20 +115,33 @@ var deployFailedLifecycleStage = step.LifecycleStage{ FinishedName: "deploy failed", Fatal: true, } -var runningLifecycleStage = step.LifecycleStage{ - ID: string(StageIDRunning), - WaitingName: "waiting for start", - RunningName: "running", - FinishedName: "completed", + +var startingLifecycleStage = step.LifecycleStage{ + ID: string(StageIDStarting), + WaitingName: "waiting to start", + RunningName: "starting", + FinishedName: "started", InputFields: map[string]struct{}{ //nolint:godox // TODO: Add wait_for here. Empty struct. "input": {}, }, + NextStages: []string{ + string(StageIDRunning), string(StageIDCrashed), + }, +} + +var runningLifecycleStage = step.LifecycleStage{ + ID: string(StageIDRunning), + WaitingName: "waiting to run", + RunningName: "running", + FinishedName: "completed", + InputFields: map[string]struct{}{}, NextStages: []string{ string(StageIDOutput), string(StageIDCrashed), }, } + var cancelledLifecycleStage = step.LifecycleStage{ ID: string(StageIDCancelled), WaitingName: "waiting for stop condition", @@ -150,6 +173,7 @@ func (p *pluginProvider) Lifecycle() step.Lifecycle[step.LifecycleStage] { Stages: []step.LifecycleStage{ deployingLifecycleStage, deployFailedLifecycleStage, + startingLifecycleStage, runningLifecycleStage, cancelledLifecycleStage, finishedLifecycleStage, @@ -272,7 +296,7 @@ func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[st schema.NewObjectSchema( "DeployError", map[string]*schema.PropertySchema{ - "error": schema.NewPropertySchema( + errorStr: schema.NewPropertySchema( schema.NewStringSchema(nil, nil, nil), nil, true, @@ -291,7 +315,7 @@ func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[st }, }, { - LifecycleStage: runningLifecycleStage, + LifecycleStage: startingLifecycleStage, InputSchema: map[string]*schema.PropertySchema{ //nolint:godox // TODO: Add wait_for right here. Should be an any type. @@ -308,6 +332,11 @@ func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[st ), }, }, + { + LifecycleStage: runningLifecycleStage, + InputSchema: nil, + Outputs: nil, + }, { LifecycleStage: cancelledLifecycleStage, InputSchema: map[string]*schema.PropertySchema{ @@ -417,6 +446,7 @@ func (r *runnableStep) Start(input map[string]any, stageChangeHandler step.Stage step: stepID, state: step.RunningStepStateStarting, localDeployer: r.localDeployer, + executionChannel: make(chan executionResult), } go s.run() @@ -444,6 +474,13 @@ type runningStep struct { useLocalDeployer bool localDeployer deployer.Connector container deployer.Plugin + executionChannel chan executionResult +} + +func (r *runningStep) CurrentStage() string { + r.lock.Lock() + defer r.lock.Unlock() + return string(r.currentStage) } func (r *runningStep) State() step.RunningStepState { @@ -453,11 +490,16 @@ func (r *runningStep) State() step.RunningStepState { } func (r *runningStep) ProvideStageInput(stage string, input map[string]any) error { + // If you change the running step's Stage in this function it can + // affect the counting of step states in the workflow's Execute function + // and notifySteps function. r.lock.Lock() // Checks which stage it is getting input for switch stage { case string(StageIDDeploy): + // input provided on this call overwrites the deployer configuration + // set at this plugin provider's instantiation if r.deployInputAvailable { r.lock.Unlock() return fmt.Errorf("deployment information provided more than once") @@ -474,6 +516,7 @@ func (r *runningStep) ProvideStageInput(stage string, input map[string]any) erro r.useLocalDeployer = true } // Make sure we transition the state before unlocking so there are no race conditions. + r.deployInputAvailable = true if r.state == step.RunningStepStateWaitingForInput && r.currentStage == StageIDDeploy { r.state = step.RunningStepStateRunning @@ -482,7 +525,7 @@ func (r *runningStep) ProvideStageInput(stage string, input map[string]any) erro // Feed the deploy step its input. r.deployInput <- unserializedDeployerConfig return nil - case string(StageIDRunning): + case string(StageIDStarting): if r.runInputAvailable { r.lock.Unlock() return fmt.Errorf("input provided more than once") @@ -497,7 +540,7 @@ func (r *runningStep) ProvideStageInput(stage string, input map[string]any) erro } // Make sure we transition the state before unlocking so there are no race conditions. r.runInputAvailable = true - if r.state == step.RunningStepStateWaitingForInput && r.currentStage == StageIDRunning { + if r.state == step.RunningStepStateWaitingForInput && r.currentStage == StageIDStarting { r.state = step.RunningStepStateRunning } // Unlock before passing the data over the channel to prevent a deadlock. @@ -506,6 +549,9 @@ func (r *runningStep) ProvideStageInput(stage string, input map[string]any) erro // Feed the run step its input over the channel. r.runInput <- input["input"] return nil + case string(StageIDRunning): + r.lock.Unlock() + return nil case string(StageIDCancelled): if input["stop_if"] != false && input["stop_if"] != nil { r.logger.Infof("Cancelling step %s", r.step) @@ -528,12 +574,6 @@ func (r *runningStep) ProvideStageInput(stage string, input map[string]any) erro } } -func (r *runningStep) CurrentStage() string { - r.lock.Lock() - defer r.lock.Unlock() - return string(r.currentStage) -} - func (r *runningStep) Close() error { r.cancel() r.lock.Lock() @@ -570,7 +610,11 @@ func (r *runningStep) run() { r.container = container } r.lock.Unlock() - if err := r.runStage(container); err != nil { + if err := r.startStage(container); err != nil { + r.startFailed(err) + return + } + if err := r.runStage(); err != nil { r.runFailed(err) } } @@ -582,15 +626,18 @@ func (r *runningStep) deployStage() (deployer.Plugin, error) { } else { r.state = step.RunningStepStateRunning } + deployInputAvailable := r.deployInputAvailable r.lock.Unlock() + r.stageChangeHandler.OnStageChange( r, nil, nil, nil, string(StageIDDeploy), - r.runInputAvailable, + deployInputAvailable, ) + var deployerConfig any var useLocalDeployer bool select { @@ -624,27 +671,33 @@ type executionResult struct { err error } -func (r *runningStep) runStage(container deployer.Plugin) error { +func (r *runningStep) startStage(container deployer.Plugin) error { r.lock.Lock() previousStage := string(r.currentStage) - r.currentStage = StageIDRunning + r.currentStage = StageIDStarting + if !r.runInputAvailable { r.state = step.RunningStepStateWaitingForInput } else { r.state = step.RunningStepStateRunning } + runInputAvailable := r.runInputAvailable r.lock.Unlock() + r.stageChangeHandler.OnStageChange( r, &previousStage, nil, nil, - string(StageIDRunning), - r.runInputAvailable, + string(StageIDStarting), + runInputAvailable, ) + r.lock.Lock() + r.currentStage = StageIDStarting r.state = step.RunningStepStateWaitingForInput r.lock.Unlock() + var runInput any select { case runInput = <-r.runInput: @@ -672,17 +725,34 @@ func (r *runningStep) runStage(container deployer.Plugin) error { // Runs the ATP client in a goroutine in order to wait for it or context done. // On context done, the deployer tries to end execution. That will shut down // (with sigterm) the container. Then wait for output, or error out. - executionChannel := make(chan executionResult) go func() { outputID, outputData, err := atpClient.Execute(r.step, runInput) - executionChannel <- executionResult{outputID, outputData, err} + r.executionChannel <- executionResult{outputID, outputData, err} }() + return nil +} + +func (r *runningStep) runStage() error { + r.lock.Lock() + previousStage := string(r.currentStage) + r.currentStage = StageIDRunning + r.state = step.RunningStepStateRunning + r.lock.Unlock() + + r.stageChangeHandler.OnStageChange( + r, + &previousStage, + nil, + nil, + string(StageIDRunning), + false, + ) var result executionResult select { - case result = <-executionChannel: + case result = <-r.executionChannel: if result.err != nil { - return err + return result.err } case <-r.ctx.Done(): // In this case, it is being instructed to stop. @@ -690,7 +760,7 @@ func (r *runningStep) runStage(container deployer.Plugin) error { r.logger.Debugf("Running step context done before step run complete. Cancelling and waiting for result.") r.cancel() // If necessary, you can add a timeout here for shutdowns that take too long. - result = <-executionChannel + result = <-r.executionChannel } // Execution complete, move to finished stage. @@ -703,6 +773,7 @@ func (r *runningStep) runStage(container deployer.Plugin) error { // This is so it properly steps through all the stages it needs to. r.state = step.RunningStepStateRunning r.lock.Unlock() + r.stageChangeHandler.OnStageChange( r, &previousStage, @@ -710,6 +781,7 @@ func (r *runningStep) runStage(container deployer.Plugin) error { nil, string(r.currentStage), false) + r.lock.Lock() r.state = step.RunningStepStateFinished r.lock.Unlock() @@ -731,7 +803,7 @@ func (r *runningStep) deployFailed(err error) { // First running, then finished. You can't skip states. r.state = step.RunningStepStateRunning r.lock.Unlock() - r.logger.Warningf("Plugin %s deploy failed. %v", r.step, err) + r.stageChangeHandler.OnStageChange( r, &previousStage, @@ -740,13 +812,15 @@ func (r *runningStep) deployFailed(err error) { string(StageIDDeployFailed), false, ) + r.logger.Warningf("Plugin %s deploy failed. %v", r.step, err) // Now it's done. r.lock.Lock() + r.currentStage = StageIDDeployFailed r.state = step.RunningStepStateFinished r.lock.Unlock() - outputID := "error" + outputID := errorStr output := any(DeployFailed{ Error: err.Error(), }) @@ -758,14 +832,49 @@ func (r *runningStep) deployFailed(err error) { ) } +func (r *runningStep) startFailed(err error) { + r.lock.Lock() + previousStage := string(r.currentStage) + r.currentStage = StageIDCrashed + r.lock.Unlock() + + r.stageChangeHandler.OnStageChange( + r, + &previousStage, + nil, + nil, + string(r.currentStage), + false) + r.logger.Warningf("Plugin step %s start failed. %v", r.step, err) + + // Now it's done. + r.lock.Lock() + r.currentStage = StageIDCrashed + r.state = step.RunningStepStateFinished + r.lock.Unlock() + + outputID := errorStr + output := any(Crashed{ + Output: err.Error(), + }) + r.stageChangeHandler.OnStepComplete( + r, + string(r.currentStage), + &outputID, + &output, + ) +} + func (r *runningStep) runFailed(err error) { + // A current lack of observability into the atp client prevents + // non-fragile testing of this function. + r.lock.Lock() previousStage := string(r.currentStage) r.currentStage = StageIDCrashed // Don't forget to update this, or else it will behave very oddly. // First running, then finished. You can't skip states. r.state = step.RunningStepStateRunning r.lock.Unlock() - r.logger.Warningf("Plugin step %s run failed. %v", r.step, err) r.stageChangeHandler.OnStageChange( r, @@ -775,12 +884,15 @@ func (r *runningStep) runFailed(err error) { string(r.currentStage), false) + r.logger.Warningf("Plugin step %s run failed. %v", r.step, err) + // Now it's done. r.lock.Lock() + r.currentStage = StageIDCrashed r.state = step.RunningStepStateFinished r.lock.Unlock() - outputID := "error" + outputID := errorStr output := any(Crashed{ Output: err.Error(), }) diff --git a/internal/step/plugin/provider_test.go b/internal/step/plugin/provider_test.go index c3f71fb8..de3d797f 100644 --- a/internal/step/plugin/provider_test.go +++ b/internal/step/plugin/provider_test.go @@ -1 +1,383 @@ package plugin_test + +import ( + "fmt" + "go.arcalot.io/assert" + "go.arcalot.io/log/v2" + "go.flow.arcalot.io/deployer" + deployer_registry "go.flow.arcalot.io/deployer/registry" + "go.flow.arcalot.io/engine/internal/step" + "go.flow.arcalot.io/engine/internal/step/plugin" + testdeployer "go.flow.arcalot.io/testdeployer" + "testing" +) + +type deployFailStageChangeHandler struct { + message chan string +} + +func (s *deployFailStageChangeHandler) OnStageChange(_ step.RunningStep, _ *string, _ *string, _ *any, _ string, _ bool) { + +} + +func (s *deployFailStageChangeHandler) OnStepComplete( + _ step.RunningStep, + previousStage string, + previousStageOutputID *string, + previousStageOutput *any, +) { + if previousStage != string(plugin.StageIDDeployFailed) { + panic(fmt.Errorf("invalid previous stage: %s", previousStage)) + } + if previousStageOutputID == nil { + panic(fmt.Errorf("no previous stage output ID")) + } + if *previousStageOutputID != "error" { + panic(fmt.Errorf("invalid previous stage output ID: %s", *previousStageOutputID)) + } + if previousStageOutput == nil { + panic(fmt.Errorf("no previous stage output ID")) + } + message := (*previousStageOutput).(plugin.DeployFailed).Error + + s.message <- message +} + +type startFailStageChangeHandler struct { + message chan string +} + +func (s *startFailStageChangeHandler) OnStageChange(_ step.RunningStep, _ *string, _ *string, _ *any, _ string, _ bool) { + +} + +func (s *startFailStageChangeHandler) OnStepComplete( + _ step.RunningStep, + previousStage string, + previousStageOutputID *string, + previousStageOutput *any, +) { + if previousStage != string(plugin.StageIDCrashed) { + panic(fmt.Errorf("invalid previous stage: %s", previousStage)) + } + if previousStageOutputID == nil { + panic(fmt.Errorf("no previous stage output ID")) + } + if *previousStageOutputID != "error" { + panic(fmt.Errorf("invalid previous stage output ID: %s", *previousStageOutputID)) + } + if previousStageOutput == nil { + panic(fmt.Errorf("no previous stage output ID")) + } + + message := (*previousStageOutput).(plugin.Crashed).Output + + s.message <- message +} + +type stageChangeHandler struct { + message chan string +} + +func (s *stageChangeHandler) OnStageChange(_ step.RunningStep, _ *string, _ *string, _ *any, _ string, _ bool) { + +} + +func (s *stageChangeHandler) OnStepComplete( + _ step.RunningStep, + previousStage string, + previousStageOutputID *string, + previousStageOutput *any, +) { + if previousStage != string(plugin.StageIDOutput) { + panic(fmt.Errorf("invalid previous stage: %s", previousStage)) + } + if previousStageOutputID == nil { + panic(fmt.Errorf("no previous stage output ID")) + } + if *previousStageOutputID != "success" { + panic(fmt.Errorf("invalid previous stage output ID: %s", *previousStageOutputID)) + } + if previousStageOutput == nil { + panic(fmt.Errorf("no previous stage output ID")) + } + message := (*previousStageOutput).(map[any]any)["message"].(string) + + s.message <- message +} + +func TestProvider_Utility(t *testing.T) { + workflowDeployerCfg := map[string]any{ + "type": "test-impl", + } + + plp, err := plugin.New( + log.New( + log.Config{ + Level: log.LevelError, + Destination: log.DestinationStdout, + }, + ), + deployer_registry.New( + deployer.Any(testdeployer.NewFactory())), + workflowDeployerCfg, + ) + assert.NoError(t, err) + assert.Equals(t, plp.Kind(), "plugin") + assert.NotNil(t, plp.ProviderSchema()) + assert.NotNil(t, plp.RunProperties()) + assert.NotNil(t, plp.Lifecycle()) + + stepSchema := map[string]any{ + "plugin": "simulation", + } + byteSchema := map[string][]byte{} + + runnable, err := plp.LoadSchema(stepSchema, byteSchema) + assert.NoError(t, err) + + assert.NotNil(t, runnable.RunSchema()) + + _, err = runnable.Lifecycle(map[string]any{"step": "wait"}) + assert.NoError(t, err) + + _, err = runnable.Lifecycle(map[string]any{"step": nil}) + assert.NoError(t, err) +} + +func TestProvider_HappyError(t *testing.T) { + logger := log.New( + log.Config{ + Level: log.LevelError, + Destination: log.DestinationStdout, + }, + ) + workflowDeployerCfg := map[string]any{ + "type": "test-impl", + } + + deployerRegistry := deployer_registry.New( + deployer.Any(testdeployer.NewFactory())) + + _, err := plugin.New( + logger, + deployerRegistry, + map[string]any{"deployer_cfg": "bad"}, + ) + assert.Error(t, err) + + plp, err := plugin.New( + logger, + deployerRegistry, + workflowDeployerCfg, + ) + assert.NoError(t, err) + + runnable, err := plp.LoadSchema( + map[string]any{"plugin": "simulation"}, map[string][]byte{}) + assert.NoError(t, err) + + handler := &stageChangeHandler{ + message: make(chan string), + } + + // start with a step id that is not in the schema + _, err = runnable.Start(map[string]any{"step": "wrong_stepid"}, handler) + assert.Error(t, err) + + // default step id + running, err := runnable.Start(map[string]any{"step": nil}, handler) + assert.NoError(t, err) + + // non-existent stage + assert.Error(t, running.ProvideStageInput( + "", nil)) + + // unserialize malformed deploy schema + assert.Error(t, running.ProvideStageInput( + string(plugin.StageIDDeploy), + map[string]any{"deploy": map[string]any{ + "type": "test-impl", + "deploy_time": "abc"}}, + )) + + assert.NoError(t, running.ProvideStageInput( + string(plugin.StageIDDeploy), + map[string]any{"deploy": map[string]any{ + "type": "test-impl", + "deploy_time": 1}}, + )) + + // provide deploy input a 2nd time + assert.Error(t, running.ProvideStageInput( + string(plugin.StageIDDeploy), + map[string]any{"deploy": map[string]any{ + "type": "test-impl", + "deploy_time": nil}}, + )) + + // unserialize nil input schema error + assert.Error(t, running.ProvideStageInput( + string(plugin.StageIDStarting), + map[string]any{"input": nil}, + )) + + // unserialize malformed input schema + assert.Error(t, running.ProvideStageInput( + string(plugin.StageIDStarting), + map[string]any{"input": 1}, + )) + + waitTimeMs := 50 + assert.NoError(t, running.ProvideStageInput( + string(plugin.StageIDStarting), + map[string]any{"input": map[string]any{"wait_time_ms": waitTimeMs}}, + )) + + // provide running input a 2nd time + assert.Error(t, running.ProvideStageInput( + string(plugin.StageIDStarting), + map[string]any{"input": map[string]any{"wait_time_ms": waitTimeMs}}, + )) + + message := <-handler.message + assert.Equals(t, + message, + fmt.Sprintf("Plugin slept for %d ms.", waitTimeMs)) + + assert.Equals(t, string(running.State()), + string(step.RunningStepStateFinished)) + + assert.Equals(t, running.CurrentStage(), string(plugin.StageIDOutput)) + + t.Cleanup(func() { + assert.NoError(t, running.Close()) + }) +} + +func TestProvider_DeployFail(t *testing.T) { + logConfig := log.Config{ + Level: log.LevelError, + Destination: log.DestinationStdout, + } + logger := log.New( + logConfig, + ) + + deployTimeMs := 20 + workflowDeployerCfg := map[string]any{ + "type": "test-impl", + "deploy_time": deployTimeMs, + "deploy_succeed": true, + } + + deployerRegistry := deployer_registry.New( + deployer.Any(testdeployer.NewFactory())) + + plp, err := plugin.New( + logger, + deployerRegistry, + workflowDeployerCfg, + ) + assert.NoError(t, err) + + runnable, err := plp.LoadSchema( + map[string]any{"plugin": "simulation"}, map[string][]byte{}) + assert.NoError(t, err) + + handler := &deployFailStageChangeHandler{ + message: make(chan string), + } + + // default step id + running, err := runnable.Start(map[string]any{"step": nil}, handler) + assert.NoError(t, err) + + assert.NoError(t, running.ProvideStageInput( + string(plugin.StageIDDeploy), + map[string]any{"deploy": map[string]any{ + "type": "test-impl", + "deploy_succeed": false, + "deploy_time": deployTimeMs}}, + )) + + waitTimeMs := 50 + assert.NoError(t, running.ProvideStageInput( + string(plugin.StageIDStarting), + map[string]any{"input": map[string]any{"wait_time_ms": waitTimeMs}}, + )) + + message := <-handler.message + assert.Equals(t, + message, + fmt.Sprintf("intentional deployment fail after %d ms", deployTimeMs)) + + assert.Equals(t, string(running.State()), + string(step.RunningStepStateFinished)) + + assert.Equals(t, running.CurrentStage(), string(plugin.StageIDDeployFailed)) + + t.Cleanup(func() { + assert.NoError(t, running.Close()) + }) +} + +func TestProvider_StartFail(t *testing.T) { + logger := log.New( + log.Config{ + Level: log.LevelError, + Destination: log.DestinationStdout, + }, + ) + deployTimeMs := 20 + workflowDeployerCfg := map[string]any{ + "type": "test-impl", + "deploy_time": deployTimeMs, + "deploy_succeed": true, + } + + plp, err := plugin.New( + logger, + deployer_registry.New( + deployer.Any(testdeployer.NewFactory())), + workflowDeployerCfg, + ) + assert.NoError(t, err) + + runnable, err := plp.LoadSchema( + map[string]any{"plugin": "simulation"}, + map[string][]byte{}) + assert.NoError(t, err) + + handler := &startFailStageChangeHandler{ + message: make(chan string), + } + + running, err := runnable.Start(map[string]any{"step": "wait"}, handler) + assert.NoError(t, err) + + // tell deployer that this run should not succeed + assert.NoError(t, running.ProvideStageInput( + string(plugin.StageIDDeploy), + map[string]any{"deploy": map[string]any{ + "type": "test-impl", + "deploy_succeed": true, + "deploy_time": deployTimeMs, + "disable_plugin_writes": true}}, + )) + + assert.NoError(t, running.ProvideStageInput( + string(plugin.StageIDStarting), + map[string]any{"input": map[string]any{ + "wait_time_ms": 50}}, + )) + + // wait for message, but we don't care about its value + <-handler.message + + assert.Equals(t, running.CurrentStage(), string(plugin.StageIDCrashed)) + + t.Cleanup(func() { + assert.NoError(t, running.Close()) + }) +} diff --git a/internal/step/provider.go b/internal/step/provider.go index ef21398a..3e173675 100644 --- a/internal/step/provider.go +++ b/internal/step/provider.go @@ -1,6 +1,8 @@ package step -import "go.flow.arcalot.io/pluginsdk/schema" +import ( + "go.flow.arcalot.io/pluginsdk/schema" +) // Provider is the description of an item that fits in a workflow. Its implementation provide the // basis for workflow execution.