diff --git a/go.mod b/go.mod index 2200f383..9cf3f781 100644 --- a/go.mod +++ b/go.mod @@ -4,17 +4,17 @@ go 1.21 require ( go.arcalot.io/assert v1.8.0 - go.arcalot.io/dgraph v1.4.1 + go.arcalot.io/dgraph v1.5.0 go.arcalot.io/lang v1.1.0 - go.arcalot.io/log/v2 v2.1.0 + go.arcalot.io/log/v2 v2.2.0 go.flow.arcalot.io/deployer v0.6.1 go.flow.arcalot.io/dockerdeployer v0.7.2 go.flow.arcalot.io/expressions v0.4.3 go.flow.arcalot.io/kubernetesdeployer v0.9.3 - go.flow.arcalot.io/pluginsdk v0.12.1 + go.flow.arcalot.io/pluginsdk v0.12.5 go.flow.arcalot.io/podmandeployer v0.11.2 go.flow.arcalot.io/pythondeployer v0.6.1 - go.flow.arcalot.io/testdeployer v0.6.1 + go.flow.arcalot.io/testdeployer v0.6.2 gopkg.in/yaml.v3 v3.0.1 ) @@ -29,7 +29,7 @@ require ( github.com/docker/go-units v0.5.0 // indirect github.com/emicklei/go-restful/v3 v3.12.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect - github.com/fxamacker/cbor/v2 v2.6.0 // indirect + github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/jsonpointer v0.21.0 // indirect @@ -57,7 +57,7 @@ require ( github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/x448/float16 v0.8.4 // indirect go.arcalot.io/exex v0.2.0 // indirect - go.flow.arcalot.io/testplugin v0.4.1 // indirect + go.flow.arcalot.io/testplugin v0.4.2 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.52.0 // indirect go.opentelemetry.io/otel v1.27.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.27.0 // indirect @@ -66,8 +66,8 @@ require ( go.opentelemetry.io/proto/otlp v1.2.0 // indirect golang.org/x/net v0.26.0 // indirect golang.org/x/oauth2 v0.20.0 // indirect - golang.org/x/sys v0.21.0 // indirect - golang.org/x/term v0.21.0 // indirect + golang.org/x/sys v0.24.0 // indirect + golang.org/x/term v0.23.0 // indirect golang.org/x/text v0.16.0 // indirect golang.org/x/time v0.5.0 // indirect google.golang.org/grpc v1.64.0 // indirect diff --git a/go.sum b/go.sum index 147337d3..3ffc7871 100644 --- a/go.sum +++ b/go.sum @@ -25,8 +25,8 @@ github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/fxamacker/cbor/v2 v2.6.0 h1:sU6J2usfADwWlYDAFhZBQ6TnLFBHxgesMrQfQgk1tWA= -github.com/fxamacker/cbor/v2 v2.6.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ= +github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E= +github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -123,14 +123,14 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.arcalot.io/assert v1.8.0 h1:hGcHMPncQXwQvjj7MbyOu2gg8VIBB00crUJZpeQOjxs= go.arcalot.io/assert v1.8.0/go.mod h1:nNmWPoNUHFyrPkNrD2aASm5yPuAfiWdB/4X7Lw3ykHk= -go.arcalot.io/dgraph v1.4.1 h1:y/lhJ68WzNUDR2BYSk7tZAZhVokts92svcrJLbK4Ebo= -go.arcalot.io/dgraph v1.4.1/go.mod h1:+Kxc81utiihMSmC1/ttSPGLDlWPpvgOpNxSFmIDPxFM= +go.arcalot.io/dgraph v1.5.0 h1:6cGlxLzmmehJoD/nj0Hkql7uh90EU0A0GtZhGYkr28M= +go.arcalot.io/dgraph v1.5.0/go.mod h1:+Kxc81utiihMSmC1/ttSPGLDlWPpvgOpNxSFmIDPxFM= go.arcalot.io/exex v0.2.0 h1:u44pjwPwcH57TF8knhaqVZP/1V/KbnRe//pKzMwDpLw= go.arcalot.io/exex v0.2.0/go.mod h1:5zlFr+7vOQNZKYCNOEDdsad+z/dlvXKs2v4kG+v+bQo= go.arcalot.io/lang v1.1.0 h1:ugglRKpd3qIMkdghAjKJxsziIgHm8QpxrzZPSXoa08I= go.arcalot.io/lang v1.1.0/go.mod h1:2BZJO4csY7NnN/Nf1+eTdIQH4A2vxtOMneaO+PJl+Co= -go.arcalot.io/log/v2 v2.1.0 h1:lNO931hJ82LgS6WcCFCxpLWXQXPFhOkz6PyAJ/augq4= -go.arcalot.io/log/v2 v2.1.0/go.mod h1:PNWOSkkPmgS2OMlWTIlB/WqOw0yaBvDYd8ENAP80H4k= +go.arcalot.io/log/v2 v2.2.0 h1:a4wVAqQ/6zFyEG6I9Dnd7eFA52KKGaBK3hj9PgZ/e0c= +go.arcalot.io/log/v2 v2.2.0/go.mod h1:h/Hlyz6wH+mjRUKdL3W2fG2oMrAm2qgPxb0rNJJDUuY= go.flow.arcalot.io/deployer v0.6.1 h1:Q65VHeRZzdrMJZqTRb26EQZQbK+C3pORETVlpw02xWQ= go.flow.arcalot.io/deployer v0.6.1/go.mod h1:Oh+71KYQEof6IS3UGhpMyjQQPRcuomUccn7fwAqrPxE= go.flow.arcalot.io/dockerdeployer v0.7.2 h1:+vUU1f0+wRYAqI41jPI1MUQFU3yknu49qsPGTaxvGug= @@ -139,16 +139,22 @@ go.flow.arcalot.io/expressions v0.4.3 h1:0BRRghutHp0sctsITHe/A1le0yYiJtKNTxm27T+ go.flow.arcalot.io/expressions v0.4.3/go.mod h1:UORX78N4ep71wOzNXdIo/UY+6SdDD0id0mvuRNEQMeM= go.flow.arcalot.io/kubernetesdeployer v0.9.3 h1:XKiqmCqXb6ZLwP5IQTAKS/gJHpq0Ub/yEjCfgAwQF2A= go.flow.arcalot.io/kubernetesdeployer v0.9.3/go.mod h1:DtB6HR7HBt/HA1vME0faIpOQ/lhfBJjL6OAGgT3Bu/Q= -go.flow.arcalot.io/pluginsdk v0.12.1 h1:HlWo1+Fn13u0EoeH9KpcWzdn1miqucNEvj0cirFhcA8= -go.flow.arcalot.io/pluginsdk v0.12.1/go.mod h1:J0RYsfD6g1WKMVSbLGZR/ZJBVdcjt+bOuKoOvnNPpy4= +go.flow.arcalot.io/pluginsdk v0.12.3 h1:TQyzRltJ92ZiRv3JMkVKywu04xhfIq+aFAlH4oSfK0I= +go.flow.arcalot.io/pluginsdk v0.12.3/go.mod h1:5kMCVigP89J/KU5T72EAczQXPWXZkRfUFcpnIkOECV8= +go.flow.arcalot.io/pluginsdk v0.12.5 h1:9b3pKeoHCRH4yF2xIdu8L7SqNUYJFxTsk8L5+inR1xw= +go.flow.arcalot.io/pluginsdk v0.12.5/go.mod h1:5kMCVigP89J/KU5T72EAczQXPWXZkRfUFcpnIkOECV8= go.flow.arcalot.io/podmandeployer v0.11.2 h1:aqrHaNaCXYDREqDJpKhVeVIIZPiPld5SDvnUcc0Tjiw= go.flow.arcalot.io/podmandeployer v0.11.2/go.mod h1:70+9M6eVQa0EEynDZ720P3AEzXt1gZto2lvoMlyjuzo= go.flow.arcalot.io/pythondeployer v0.6.1 h1:IyaA9BVfHJ2fhC+fNfT6VicrtRGFlZOlSaAVGGPwo1E= go.flow.arcalot.io/pythondeployer v0.6.1/go.mod h1:gfEZtkDR/UURlwexuH0jARykh6hvFz1V5dC32Ub/YaM= go.flow.arcalot.io/testdeployer v0.6.1 h1:Bo0bP1U4QoMEMCHCOBVBTENpIlB8P19/5sh609r8mek= go.flow.arcalot.io/testdeployer v0.6.1/go.mod h1:FLVrEhYWH1a9RvIgtjZ88lRaodP+bOGEFqDqtRuN8B0= +go.flow.arcalot.io/testdeployer v0.6.2 h1:Bys5WJvHakXz543DsoRPJNBDS6MQUZca2STiaE6ERjM= +go.flow.arcalot.io/testdeployer v0.6.2/go.mod h1:0GooNIfYPWSMLlN6RDJjtsoekQGZRh0iVyRFDxwea1g= go.flow.arcalot.io/testplugin v0.4.1 h1:czJFi2kg54as9qVbDPW4cjrRcF3lyKyPlBew+Fmy5sE= go.flow.arcalot.io/testplugin v0.4.1/go.mod h1:yU+8dTuaqPlxNkceLN9AOAwe5lG+nsztCZYgEMmw3sc= +go.flow.arcalot.io/testplugin v0.4.2 h1:JTS8AIXntJOuwTfXwUmrXV2v2Wk7z48TYvC52fJPdkE= +go.flow.arcalot.io/testplugin v0.4.2/go.mod h1:yU+8dTuaqPlxNkceLN9AOAwe5lG+nsztCZYgEMmw3sc= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.52.0 h1:9l89oX4ba9kHbBol3Xin3leYJ+252h0zszDtBwyKe2A= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.52.0/go.mod h1:XLZfZboOJWHNKUv7eH0inh0E9VV6eWDFB/9yJyTLPp0= go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg= @@ -185,10 +191,14 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= -golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/term v0.21.0 h1:WVXCp+/EBEHOj53Rvu+7KiT/iElMrO8ACK16SMZ3jaA= -golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0= +golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= +golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.22.0 h1:BbsgPEJULsl2fV/AT3v15Mjva5yXKQDyKf+TbDz7QJk= +golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4= +golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU= +golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= diff --git a/internal/step/dummy/provider_test.go b/internal/step/dummy/provider_test.go index 622896e4..fd6e0fc1 100644 --- a/internal/step/dummy/provider_test.go +++ b/internal/step/dummy/provider_test.go @@ -14,6 +14,9 @@ type stageChangeHandler struct { message chan string } +func (s *stageChangeHandler) OnStepStageFailure(_ step.RunningStep, _ string, _ *sync.WaitGroup, _ error) { +} + func (s *stageChangeHandler) OnStageChange(_ step.RunningStep, _ *string, _ *string, _ *any, _ string, _ bool, _ *sync.WaitGroup) { } diff --git a/internal/step/foreach/provider.go b/internal/step/foreach/provider.go index f2ded8b4..906db40f 100644 --- a/internal/step/foreach/provider.go +++ b/internal/step/foreach/provider.go @@ -3,6 +3,7 @@ package foreach import ( "context" "fmt" + "go.arcalot.io/dgraph" "reflect" "sync" @@ -46,9 +47,9 @@ var executeLifecycleStage = step.LifecycleStage{ "items": {}, "wait_for": {}, }, - NextStages: []string{ - string(StageIDOutputs), - string(StageIDFailed), + NextStages: map[string]dgraph.DependencyType{ + string(StageIDOutputs): dgraph.AndDependency, + string(StageIDFailed): dgraph.CompletionAndDependency, }, Fatal: false, } @@ -58,7 +59,7 @@ var outputLifecycleStage = step.LifecycleStage{ RunningName: "output", FinishedName: "output", InputFields: map[string]struct{}{}, - NextStages: []string{}, + NextStages: map[string]dgraph.DependencyType{}, Fatal: false, } var errorLifecycleStage = step.LifecycleStage{ @@ -67,7 +68,7 @@ var errorLifecycleStage = step.LifecycleStage{ RunningName: "processing error", FinishedName: "error", InputFields: map[string]struct{}{}, - NextStages: []string{}, + NextStages: map[string]dgraph.DependencyType{}, Fatal: true, } @@ -156,7 +157,7 @@ func (l *forEachProvider) LoadSchema(inputs map[string]any, workflowContext map[ return nil, err } - executor, err := l.executorFactory(l.logger) + executor, err := l.executorFactory(l.logger.WithLabel("subworkflow", workflowFileName.(string))) if err != nil { return nil, err } @@ -359,6 +360,7 @@ type runningStep struct { inputAvailable bool inputData chan []any ctx context.Context + closed bool wg sync.WaitGroup cancel context.CancelFunc stageChangeHandler step.StageChangeHandler @@ -368,6 +370,11 @@ type runningStep struct { func (r *runningStep) ProvideStageInput(stage string, input map[string]any) error { r.lock.Lock() + defer r.lock.Unlock() + if r.closed { + r.logger.Debugf("exiting foreach ProvideStageInput due to step being closed") + return nil + } switch stage { case string(StageIDExecute): items := input["items"] @@ -377,30 +384,24 @@ func (r *runningStep) ProvideStageInput(stage string, input map[string]any) erro item := v.Index(i).Interface() _, err := r.workflow.Input().Unserialize(item) if err != nil { - r.lock.Unlock() return fmt.Errorf("invalid input item %d for subworkflow (%w) for run/step %s", i, err, r.runID) } input[i] = item } if r.inputAvailable { - r.lock.Unlock() return fmt.Errorf("input for execute workflow provided twice for run/step %s", r.runID) } if r.currentState == step.RunningStepStateWaitingForInput && r.currentStage == StageIDExecute { r.currentState = step.RunningStepStateRunning } r.inputAvailable = true - r.lock.Unlock() - r.inputData <- input + r.inputData <- input // Send before unlock to ensure that it never gets closed before sending. return nil case string(StageIDOutputs): - r.lock.Unlock() return nil case string(StageIDFailed): - r.lock.Unlock() return nil default: - r.lock.Unlock() return fmt.Errorf("invalid stage: %s", stage) } } @@ -418,8 +419,13 @@ func (r *runningStep) State() step.RunningStepState { } func (r *runningStep) Close() error { + r.lock.Lock() + r.closed = true + r.lock.Unlock() r.cancel() r.wg.Wait() + r.logger.Debugf("Closing inputData channel in foreach step provider") + close(r.inputData) return nil } @@ -431,7 +437,7 @@ func (r *runningStep) ForceClose() error { func (r *runningStep) run() { r.wg.Add(1) defer func() { - close(r.inputData) + r.logger.Debugf("foreach run function done") r.wg.Done() }() waitingForInput := false @@ -452,95 +458,122 @@ func (r *runningStep) run() { waitingForInput, &r.wg, ) + r.runOnInput() +} + +func (r *runningStep) runOnInput() { select { case loopData, ok := <-r.inputData: if !ok { + r.logger.Debugf("aborted waiting for result in foreach") return } + r.processInput(loopData) + case <-r.ctx.Done(): + r.logger.Debugf("context done") + return + } +} - itemOutputs := make([]any, len(loopData)) - itemErrors := make(map[int]string, len(loopData)) - - r.logger.Debugf("Executing subworkflow for step %s...", r.runID) - wg := &sync.WaitGroup{} - wg.Add(len(loopData)) - errors := false - sem := make(chan struct{}, r.parallelism) - for i, input := range loopData { - i := i - input := input - go func() { - defer func() { - <-sem - wg.Done() - }() - r.logger.Debugf("Queuing item %d...", i) - select { - case sem <- struct{}{}: - case <-r.ctx.Done(): - r.logger.Debugf("Aborting item %d execution.", i) - return - } +func (r *runningStep) processInput(inputData []any) { + r.logger.Debugf("Executing subworkflow for step %s...", r.runID) + outputs, errors := r.executeSubWorkflows(inputData) - r.logger.Debugf("Executing item %d...", i) - // Ignore the output ID here because it can only be "success" - _, outputData, err := r.workflow.Execute(r.ctx, input) - r.lock.Lock() - if err != nil { - errors = true - itemErrors[i] = err.Error() - } else { - itemOutputs[i] = outputData - } - r.lock.Unlock() - r.logger.Debugf("Item %d complete.", i) - }() + r.logger.Debugf("Subworkflow %s complete.", r.runID) + r.lock.Lock() + previousStage := string(r.currentStage) + r.currentState = step.RunningStepStateRunning + var outputID string + var outputData any + var unresolvableStage StageID + var unresolvableError error + if len(errors) > 0 { + r.currentStage = StageIDFailed + unresolvableStage = StageIDOutputs + unresolvableError = fmt.Errorf("foreach subworkflow failed with errors (%v)", errors) + outputID = "error" + dataMap := make(map[int]any, len(inputData)) + for i, entry := range outputs { + if entry != nil { + dataMap[i] = entry + } } - wg.Wait() - r.logger.Debugf("Subworkflow %s complete.", r.runID) - r.lock.Lock() - previousStage := string(r.currentStage) - r.currentState = step.RunningStepStateRunning - var outputID string - var outputData any - if errors { - r.currentStage = StageIDFailed - outputID = "error" - dataMap := make(map[int]any, len(loopData)) - for i, entry := range itemOutputs { - if entry != nil { - dataMap[i] = entry + outputData = map[string]any{ + "data": dataMap, + "messages": errors, + } + } else { + r.currentStage = StageIDOutputs + unresolvableStage = StageIDFailed + unresolvableError = fmt.Errorf("foreach succeeded, so error case is unresolvable") + outputID = "success" + outputData = map[string]any{ + "data": outputs, + } + } + currentStage := r.currentStage + r.lock.Unlock() + r.stageChangeHandler.OnStageChange( + r, + &previousStage, + nil, + nil, + string(currentStage), + false, + &r.wg, + ) + r.stageChangeHandler.OnStepStageFailure( + r, + string(unresolvableStage), + &r.wg, + unresolvableError, + ) + r.lock.Lock() + r.currentState = step.RunningStepStateFinished + previousStage = string(r.currentStage) + r.lock.Unlock() + r.stageChangeHandler.OnStepComplete(r, previousStage, &outputID, &outputData, &r.wg) +} + +// returns true if there is an error. +func (r *runningStep) executeSubWorkflows(inputData []any) ([]any, map[int]string) { + itemOutputs := make([]any, len(inputData)) + itemErrors := make(map[int]string, len(inputData)) + wg := &sync.WaitGroup{} + wg.Add(len(inputData)) + sem := make(chan struct{}, r.parallelism) + for i, input := range inputData { + i := i + input := input + go func() { + defer func() { + select { + case <-sem: + case <-r.ctx.Done(): // Must not deadlock if closed early. } + wg.Done() + }() + r.logger.Debugf("Queuing item %d...", i) + select { + case sem <- struct{}{}: + case <-r.ctx.Done(): + r.logger.Debugf("Aborting item %d execution.", i) + return } - outputData = map[string]any{ - "data": dataMap, - "messages": itemErrors, - } - } else { - r.currentStage = StageIDOutputs - outputID = "success" - outputData = map[string]any{ - "data": itemOutputs, + + r.logger.Debugf("Executing item %d...", i) + // Ignore the output ID here because it can only be "success" + _, outputData, err := r.workflow.Execute(r.ctx, input) + r.lock.Lock() + if err != nil { + itemErrors[i] = err.Error() + } else { + itemOutputs[i] = outputData } - } - currentStage := r.currentStage - r.lock.Unlock() - r.stageChangeHandler.OnStageChange( - r, - &previousStage, - nil, - nil, - string(currentStage), - false, - &r.wg, - ) - r.lock.Lock() - r.currentState = step.RunningStepStateFinished - previousStage = string(r.currentStage) - r.lock.Unlock() - r.stageChangeHandler.OnStepComplete(r, previousStage, &outputID, &outputData, &r.wg) - case <-r.ctx.Done(): - return + r.lock.Unlock() + r.logger.Debugf("Item %d complete.", i) + }() } - + wg.Wait() + return itemOutputs, itemErrors } diff --git a/internal/step/lifecycle.go b/internal/step/lifecycle.go index 83495250..700fe2fd 100644 --- a/internal/step/lifecycle.go +++ b/internal/step/lifecycle.go @@ -30,10 +30,10 @@ func (l Lifecycle[StageType]) DAG() (dgraph.DirectedGraph[StageType], error) { } } for _, stage := range l.Stages { - node := lang.Must2(dag.GetNodeByID(stage.Identifier())) - for _, nextStage := range stage.NextStageIDs() { - if err := node.Connect(nextStage); err != nil { - return nil, fmt.Errorf("failed to connect lifecycle stage %s to %s (%w)", node.ID(), nextStage, err) + for nextStage, dependencyType := range stage.NextStageIDs() { + nextStageNode := lang.Must2(dag.GetNodeByID(nextStage)) + if err := nextStageNode.ConnectDependency(stage.Identifier(), dependencyType); err != nil { + return nil, fmt.Errorf("failed to connect lifecycle stage %s to %s (%w)", stage.Identifier(), nextStage, err) } } } @@ -52,7 +52,7 @@ type lifecycleStage interface { // Identifier returns the ID of the stage. Identifier() string // NextStageIDs returns the next stage identifiers. - NextStageIDs() []string + NextStageIDs() map[string]dgraph.DependencyType } // LifecycleStage is the description of a single stage within a step lifecycle. @@ -72,7 +72,7 @@ type LifecycleStage struct { // will pause if there is no input available. // It will automatically create a DAG node between the current and the described next stages to ensure // that it is running in order. - NextStages []string + NextStages map[string]dgraph.DependencyType // Fatal indicates that this stage should be treated as fatal unless handled by the workflow. Fatal bool } @@ -83,7 +83,7 @@ func (l LifecycleStage) Identifier() string { } // NextStageIDs is a helper function that returns the next possible stages. -func (l LifecycleStage) NextStageIDs() []string { +func (l LifecycleStage) NextStageIDs() map[string]dgraph.DependencyType { return l.NextStages } diff --git a/internal/step/plugin/provider.go b/internal/step/plugin/provider.go index 23ad9798..23ed21a4 100644 --- a/internal/step/plugin/provider.go +++ b/internal/step/plugin/provider.go @@ -3,13 +3,16 @@ package plugin import ( "context" "fmt" + "go.arcalot.io/dgraph" "go.flow.arcalot.io/pluginsdk/plugin" "reflect" + "strconv" "strings" "sync" + "sync/atomic" "time" - log "go.arcalot.io/log/v2" + "go.arcalot.io/log/v2" "go.flow.arcalot.io/deployer" "go.flow.arcalot.io/deployer/registry" "go.flow.arcalot.io/engine/internal/step" @@ -158,6 +161,9 @@ const ( StageIDOutput StageID = "outputs" // StageIDCrashed is a stage that indicates that the plugin has quit unexpectedly. StageIDCrashed StageID = "crashed" + // StageIDClosed is a stage that indicates that the plugin has exited due to workflow + // termination or step cancellation. + StageIDClosed StageID = "closed" // StageIDStarting is a stage that indicates that the plugin execution has begun. StageIDStarting StageID = "starting" ) @@ -168,8 +174,10 @@ var deployingLifecycleStage = step.LifecycleStage{ RunningName: "deploying", FinishedName: "deployed", InputFields: map[string]struct{}{string(StageIDDeploy): {}}, - NextStages: []string{ - string(StageIDStarting), string(StageIDDeployFailed), + NextStages: map[string]dgraph.DependencyType{ + string(StageIDStarting): dgraph.AndDependency, + string(StageIDDeployFailed): dgraph.CompletionAndDependency, + string(StageIDClosed): dgraph.CompletionAndDependency, }, Fatal: false, } @@ -189,8 +197,11 @@ var enablingLifecycleStage = step.LifecycleStage{ InputFields: map[string]struct{}{ "enabled": {}, }, - NextStages: []string{ - string(StageIDStarting), string(StageIDDisabled), string(StageIDCrashed), + NextStages: map[string]dgraph.DependencyType{ + string(StageIDStarting): dgraph.AndDependency, + string(StageIDDisabled): dgraph.AndDependency, + string(StageIDCrashed): dgraph.CompletionAndDependency, + string(StageIDClosed): dgraph.CompletionAndDependency, }, } @@ -200,11 +211,14 @@ var startingLifecycleStage = step.LifecycleStage{ RunningName: "starting", FinishedName: "started", InputFields: map[string]struct{}{ - "input": {}, - "wait_for": {}, + "input": {}, + "wait_for": {}, + "closure_wait_timeout": {}, }, - NextStages: []string{ - string(StageIDRunning), string(StageIDCrashed), + NextStages: map[string]dgraph.DependencyType{ + string(StageIDRunning): dgraph.AndDependency, + string(StageIDCrashed): dgraph.CompletionAndDependency, + string(StageIDClosed): dgraph.CompletionAndDependency, }, } @@ -214,8 +228,10 @@ var runningLifecycleStage = step.LifecycleStage{ RunningName: "running", FinishedName: "completed", InputFields: map[string]struct{}{}, - NextStages: []string{ - string(StageIDOutput), string(StageIDCrashed), + NextStages: map[string]dgraph.DependencyType{ + string(StageIDOutput): dgraph.AndDependency, + string(StageIDCrashed): dgraph.CompletionAndDependency, + string(StageIDClosed): dgraph.CompletionAndDependency, }, } @@ -227,8 +243,11 @@ var cancelledLifecycleStage = step.LifecycleStage{ InputFields: map[string]struct{}{ "stop_if": {}, }, - NextStages: []string{ - string(StageIDOutput), string(StageIDCrashed), string(StageIDDeployFailed), + NextStages: map[string]dgraph.DependencyType{ + string(StageIDOutput): dgraph.CompletionAndDependency, + string(StageIDCrashed): dgraph.CompletionAndDependency, + string(StageIDDeployFailed): dgraph.CompletionAndDependency, + string(StageIDClosed): dgraph.CompletionAndDependency, }, } @@ -252,6 +271,12 @@ var crashedLifecycleStage = step.LifecycleStage{ RunningName: "crashed", FinishedName: "crashed", } +var closedLifecycleStage = step.LifecycleStage{ + ID: string(StageIDClosed), + WaitingName: "closed", + RunningName: "closed", + FinishedName: "closed", +} // Lifecycle returns a lifecycle that contains all plugin lifecycle stages. func (p *pluginProvider) Lifecycle() step.Lifecycle[step.LifecycleStage] { @@ -267,6 +292,7 @@ func (p *pluginProvider) Lifecycle() step.Lifecycle[step.LifecycleStage] { disabledLifecycleStage, finishedLifecycleStage, crashedLifecycleStage, + closedLifecycleStage, }, } } @@ -292,7 +318,7 @@ func (p *pluginProvider) LoadSchema(inputs map[string]any, _ map[string][]byte) requestedDeploymentType, pluginSource, err) } // Set up the ATP connection - transport := atp.NewClientWithLogger(pluginConnector, p.logger) + transport := atp.NewClientWithLogger(pluginConnector, p.logger.WithLabel("source", "atp-client")) // Read the schema information s, err := transport.ReadSchema() if err != nil { @@ -421,6 +447,26 @@ func (r *runnableStep) DisabledOutputSchema() *schema.StepOutputSchema { ) } +const defaultClosureTimeout = 5000 + +func closureTimeoutSchema() *schema.PropertySchema { + return schema.NewPropertySchema( + schema.NewIntSchema(schema.PointerTo(int64(0)), nil, nil), + schema.NewDisplayValue( + schema.PointerTo("closure wait timeout"), + schema.PointerTo("The amount of milliseconds to wait after sending the cancel "+ + "signal on closure before force killing the step."), + nil, + ), + false, + nil, + nil, + nil, + schema.PointerTo(strconv.FormatInt(defaultClosureTimeout, 10)), + nil, + ) +} + func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[step.LifecycleStageWithSchema], err error) { rawStepID, ok := input["step"] if !ok || rawStepID == nil { @@ -568,6 +614,7 @@ func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[st nil, nil, ), + "closure_wait_timeout": closureTimeoutSchema(), }, Outputs: map[string]*schema.StepOutputSchema{ "started": r.StartedSchema(), @@ -624,6 +671,51 @@ func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[st }, }, }, + { + LifecycleStage: closedLifecycleStage, + InputSchema: nil, + Outputs: map[string]*schema.StepOutputSchema{ + "result": { + SchemaValue: schema.NewScopeSchema( + schema.NewObjectSchema( + "ClosedInfo", + map[string]*schema.PropertySchema{ + "cancelled": schema.NewPropertySchema( + schema.NewBoolSchema(), + schema.NewDisplayValue( + schema.PointerTo("cancelled"), + schema.PointerTo("Whether the step was cancelled by stop_if"), + nil, + ), + true, + nil, + nil, + nil, + nil, + nil, + ), + "close_requested": schema.NewPropertySchema( + schema.NewBoolSchema(), + schema.NewDisplayValue( + schema.PointerTo("close requested"), + schema.PointerTo("Whether the step was closed with Close()"), + nil, + ), + true, + nil, + nil, + nil, + nil, + nil, + ), + }, + ), + ), + DisplayValue: nil, + ErrorValue: true, + }, + }, + }, }, }, nil } @@ -674,7 +766,7 @@ func (r *runnableStep) Start(input map[string]any, runID string, stageChangeHand ctx: ctx, cancel: cancel, deployInput: make(chan any, 1), - runInput: make(chan any, 1), + runInput: make(chan runInput, 1), enabledInput: make(chan bool, 1), logger: r.logger, deploymentType: r.deploymentType, @@ -682,17 +774,23 @@ func (r *runnableStep) Start(input map[string]any, runID string, stageChangeHand pluginStepID: stepID, state: step.RunningStepStateStarting, localDeployer: r.localDeployer, - executionChannel: make(chan atp.ExecutionResult), - signalToStep: make(chan schema.Input), - signalFromStep: make(chan schema.Input), + executionChannel: make(chan atp.ExecutionResult, 2), // Buffer in case a result happens after context done. + signalToStep: make(chan schema.Input, 10), + signalFromStep: make(chan schema.Input, 10), runID: runID, } + s.wg.Add(1) // Wait for the run to finish before closing. go s.run() return s, nil } +type runInput struct { + stepInputData any + forceCloseTimeoutMS int64 +} + type runningStep struct { deployerRegistry registry.Registry stepSchema schema.Step @@ -701,12 +799,13 @@ type runningStep struct { wg sync.WaitGroup ctx context.Context cancel context.CancelFunc + cancelled bool atpClient atp.Client deployInput chan any deployInputAvailable bool enabledInput chan bool enabledInputAvailable bool - runInput chan any + runInput chan runInput runInputAvailable bool logger log.Logger currentStage StageID @@ -721,7 +820,7 @@ type runningStep struct { executionChannel chan atp.ExecutionResult signalToStep chan schema.Input // Communicates with the ATP client, not other steps. signalFromStep chan schema.Input // Communicates with the ATP client, not other steps. - closed bool + closed atomic.Bool // Store channels for sending pre-calculated signal outputs to other steps? // Store channels for receiving pre-calculated signal inputs from other steps? } @@ -746,8 +845,6 @@ func (r *runningStep) ProvideStageInput(stage string, input map[string]any) erro // and notifySteps function. r.lock.Lock() defer r.lock.Unlock() - r.logger.Debugf("ProvideStageInput START") - defer r.logger.Debugf("ProvideStageInput END") // Checks which stage it is getting input for, and handles it. switch stage { @@ -761,6 +858,8 @@ func (r *runningStep) ProvideStageInput(stage string, input map[string]any) erro return nil case string(StageIDCancelled): return r.provideCancelledInput(input) + case string(StageIDClosed): + return nil case string(StageIDDeployFailed): return nil case string(StageIDCrashed): @@ -831,6 +930,19 @@ func (r *runningStep) provideStartingInput(input map[string]any) error { if _, err := r.stepSchema.Input().Unserialize(input["input"]); err != nil { return err } + + // Now get the other necessary data for running the step + var timeout int64 + if input["closure_wait_timeout"] != nil { + unserializedTimeout, err := closureTimeoutSchema().Unserialize(input["closure_wait_timeout"]) + if err != nil { + return err + } + timeout = unserializedTimeout.(int64) + } else { + timeout = defaultClosureTimeout + } + // Make sure we transition the state before unlocking so there are no race conditions. r.runInputAvailable = true @@ -839,7 +951,10 @@ func (r *runningStep) provideStartingInput(input map[string]any) error { // Feed the run step its input over the channel. select { - case r.runInput <- input["input"]: + case r.runInput <- runInput{ + stepInputData: input["input"], + forceCloseTimeoutMS: timeout, + }: default: return fmt.Errorf("unable to provide input to run stage for step %s/%s", r.runID, r.pluginStepID) } @@ -849,12 +964,25 @@ func (r *runningStep) provideStartingInput(input map[string]any) error { func (r *runningStep) provideCancelledInput(input map[string]any) error { // Note: The calling function must have the step mutex locked // Cancel if the step field is present and isn't false - if input["stop_if"] != false && input["stop_if"] != nil { + if input["stop_if"] == nil { + return nil + } + if input["stop_if"] != false { + r.cancelled = true r.cancelStep() } return nil } +func (r *runningStep) hasCancellationHandler() bool { + handler, present := r.stepSchema.SignalHandlers()[plugin.CancellationSignalSchema.ID()] + return present && handler != nil +} + +func (r *runningStep) getCancellationHandler() *schema.SignalSchema { + return r.stepSchema.SignalHandlers()[plugin.CancellationSignalSchema.ID()] +} + // cancelStep gracefully requests cancellation for any stage. // If running, it sends a cancel signal if the plugin supports it. func (r *runningStep) cancelStep() { @@ -863,11 +991,14 @@ func (r *runningStep) cancelStep() { // If it isn't, cancelling the context alone should be enough. if r.currentStage == StageIDRunning { // Verify that the step has a cancel signal - cancelSignal := r.stepSchema.SignalHandlers()[plugin.CancellationSignalSchema.ID()] - if cancelSignal == nil { + if !r.hasCancellationHandler() { r.logger.Errorf("could not cancel step %s/%s. Does not contain cancel signal receiver.", r.runID, r.pluginStepID) - } else if err := plugin.CancellationSignalSchema.DataSchema().ValidateCompatibility(cancelSignal.DataSchema()); err != nil { + } + cancelSignal := r.getCancellationHandler() + if err := plugin.CancellationSignalSchema.DataSchema().ValidateCompatibility(cancelSignal.DataSchema()); err != nil { r.logger.Errorf("validation failed for cancel signal for step %s/%s: %s", r.runID, r.pluginStepID, err) + } else if r.signalToStep == nil { + r.logger.Debugf("signal send channel closed; the step %s/%s likely finished", r.runID, r.pluginStepID) } else { // Validated. Now call the signal. r.signalToStep <- schema.Input{RunID: r.runID, ID: cancelSignal.ID(), InputData: map[any]any{}} @@ -881,26 +1012,56 @@ func (r *runningStep) cancelStep() { // Warning: This means that it won't wait for the ATP client to finish. This is okay if using a deployer that // will stop execution once the deployer closes it. func (r *runningStep) ForceClose() error { - err := r.closeComponents(false) + closedAlready := r.closed.Swap(true) + if closedAlready { + r.wg.Wait() + return nil + } + err := r.forceClose() + if err != nil { + return err + } // Wait for the run to finish to ensure that it's not running after closing. r.wg.Wait() - r.closed = true r.logger.Warningf("Step %s/%s force closed.", r.runID, r.pluginStepID) + return nil +} + +// This is necessary so that the waitgroup's Wait() function is not called by a function whose +// completion is required to end the wait. +func (r *runningStep) forceCloseInternal() error { + closedAlready := r.closed.Swap(true) + if closedAlready { + return nil + } + return r.forceClose() +} + +func (r *runningStep) forceClose() error { + r.cancel() + err := r.closeComponents(false) return err } func (r *runningStep) Close() error { + closedAlready := r.closed.Swap(true) + if closedAlready { + r.wg.Wait() + return nil + } + r.cancel() err := r.closeComponents(true) // Wait for the run to finish to ensure that it's not running after closing. r.wg.Wait() - r.closed = true + r.closed.Store(true) return err } func (r *runningStep) closeComponents(closeATP bool) error { r.cancel() r.lock.Lock() - if r.closed { + if r.closed.Load() { + r.lock.Unlock() return nil // Already closed } var atpErr error @@ -922,52 +1083,88 @@ func (r *runningStep) closeComponents(closeATP bool) error { return nil } +// Note: Caller must add 1 to the waitgroup before calling. func (r *runningStep) run() { - r.wg.Add(1) // Wait for the run to finish before closing. defer func() { r.cancel() // Close before WaitGroup done r.wg.Done() // Done. Close may now exit. }() - container, err := r.deployStage() - if err != nil { - r.deployFailed(err) + pluginConnection := r.startPlugin() + if pluginConnection == nil { return } + defer func() { + err := pluginConnection.Close() + if err != nil { + r.logger.Errorf("failed to close deployed container for step %s/%s", r.runID, r.pluginStepID) + } + }() + r.postDeployment(pluginConnection) +} + +// Deploys the plugin, and handles failure cases. +func (r *runningStep) startPlugin() deployer.Plugin { + pluginConnection, contextDoneEarly, err := r.deployStage() + if contextDoneEarly { + if err != nil { + r.logger.Debugf("error due to step early closure: %s", err.Error()) + } + r.closedEarly(StageIDEnabling, true) + return nil + } else if err != nil { + r.deployFailed(err) + return nil + } r.lock.Lock() select { case <-r.ctx.Done(): - if err := container.Close(); err != nil { - r.logger.Warningf("failed to remove deployed container for step %s/%s", r.runID, r.pluginStepID) + if err := pluginConnection.Close(); err != nil { + r.logger.Warningf("failed to close deployed container for step %s/%s", r.runID, r.pluginStepID) } r.lock.Unlock() - r.transitionToCancelled() - return + r.closedEarly(StageIDEnabling, false) + return nil default: - r.container = container + r.container = pluginConnection } r.lock.Unlock() - r.logger.Debugf("Successfully deployed container with ID '%s' for step %s/%s", container.ID(), r.runID, r.pluginStepID) + r.logger.Debugf("Successfully deployed container with ID '%s' for step %s/%s", pluginConnection.ID(), r.runID, r.pluginStepID) + return pluginConnection +} +func (r *runningStep) postDeployment(pluginConnection deployer.Plugin) { r.logger.Debugf("Checking to see if step %s/%s is enabled", r.runID, r.pluginStepID) - enabled, err := r.enableStage() - if err != nil { - r.startFailed(err) + enabled, contextDoneEarly := r.enableStage() + if contextDoneEarly { + r.closedEarly(StageIDStarting, true) + return } r.logger.Debugf("Step %s/%s enablement state: %t", r.runID, r.pluginStepID, enabled) if !enabled { r.transitionToDisabled() return } - if err := r.startStage(container); err != nil { + + var forceCloseTimeoutMS int64 + var err error + if contextDoneEarly, forceCloseTimeoutMS, err = r.startStage(pluginConnection); contextDoneEarly { + r.closedEarly(StageIDRunning, true) + return + } else if err != nil { r.startFailed(err) return } - if err := r.runStage(); err != nil { + if err := r.runStage(forceCloseTimeoutMS); err != nil { r.runFailed(err) } } -func (r *runningStep) deployStage() (deployer.Plugin, error) { +// deployStage deploys the step. +// Return types: +// - deployer.Plugin: The deployer's connection to the plugin. +// - bool: True if the context was done, causing early failure. +// - err: The error, if a failure occurred. +func (r *runningStep) deployStage() (deployer.Plugin, bool, error) { r.logger.Debugf("Deploying stage for step %s/%s", r.runID, r.pluginStepID) r.lock.Lock() r.state = step.RunningStepStateRunning @@ -1003,7 +1200,7 @@ func (r *runningStep) deployStage() (deployer.Plugin, error) { r.state = step.RunningStepStateRunning r.lock.Unlock() case <-r.ctx.Done(): - return nil, fmt.Errorf("step closed before deployment config could be obtained") + return nil, true, nil } } r.lock.Lock() @@ -1016,18 +1213,21 @@ func (r *runningStep) deployStage() (deployer.Plugin, error) { stepDeployer, err = r.deployerRegistry.Create(r.deploymentType, deployerConfig, r.logger.WithLabel("source", "deployer")) if err != nil { - return nil, err + return nil, false, err } } container, err := stepDeployer.Deploy(r.ctx, r.source) if err != nil { - return nil, err + return nil, false, err } - return container, nil + return container, false, nil } // enableStage returns the result of whether the stage was enabled or not. -func (r *runningStep) enableStage() (bool, error) { +// Return values: +// - bool: Whether the step was enabled. +// - bool: True if the step was disabled due to context done. +func (r *runningStep) enableStage() (bool, bool) { r.lock.Lock() previousStage := string(r.currentStage) r.currentStage = StageIDEnabling @@ -1045,23 +1245,34 @@ func (r *runningStep) enableStage() (bool, error) { &r.wg, ) + var enabled bool select { - case enabled := <-r.enabledInput: - return enabled, nil + case enabled = <-r.enabledInput: case <-r.ctx.Done(): - return false, fmt.Errorf("step closed while determining enablement status") + return false, true + } + + if enabled { + // It's enabled, so the disabled stage will not occur. + r.stageChangeHandler.OnStepStageFailure(r, string(StageIDDisabled), &r.wg, fmt.Errorf("step enabled; cannot be disabled anymore")) } + return enabled, false } -func (r *runningStep) startStage(container deployer.Plugin) error { +// startStage gets the inputs and starts the step. +// Return values: +// - bool: True if the step was closed early due to context done. +// - int64: the amount of milliseconds to wait to force terminate a step. +// - error: Any error that occurred while trying to start the step. +func (r *runningStep) startStage(container deployer.Plugin) (bool, int64, error) { r.logger.Debugf("Starting stage for step %s/%s", r.runID, r.pluginStepID) - atpClient := atp.NewClientWithLogger(container, r.logger) + atpClient := atp.NewClientWithLogger(container, r.logger.WithLabel("source", "atp-client")) var inputReceivedEarly bool r.lock.Lock() r.atpClient = atpClient r.lock.Unlock() - var runInput any + var runInput runInput var newState step.RunningStepState select { case runInput = <-r.runInput: @@ -1100,41 +1311,51 @@ func (r *runningStep) startStage(container deployer.Plugin) error { r.state = step.RunningStepStateRunning r.lock.Unlock() case <-r.ctx.Done(): - return fmt.Errorf("step closed while waiting for run configuration") + r.logger.Debugf("step closed while waiting for run configuration") + return true, 0, nil } } inputSchema, err := r.atpClient.ReadSchema() if err != nil { - return err + return false, 0, err } steps := inputSchema.Steps() stepSchema, ok := steps[r.pluginStepID] if !ok { - return fmt.Errorf("error in run step %s: schema mismatch between local and remote deployed plugin, no stepSchema named %s found in remote", r.runID, r.pluginStepID) + return false, 0, fmt.Errorf("error in run step %s: schema mismatch between local and remote deployed plugin, no stepSchema named %s found in remote", r.runID, r.pluginStepID) } // Re-verify input. This should have also been done earlier. - if _, err := stepSchema.Input().Unserialize(runInput); err != nil { - return fmt.Errorf("schema mismatch between local and remote deployed plugin in step %s/%s, unserializing input failed (%w)", r.runID, r.pluginStepID, err) + if _, err := stepSchema.Input().Unserialize(runInput.stepInputData); err != nil { + return false, 0, fmt.Errorf("schema mismatch between local and remote deployed plugin in step %s/%s, unserializing input failed (%w)", r.runID, r.pluginStepID, err) } + r.wg.Add(1) + // Runs the ATP client in a goroutine in order to wait for it. - // On context done, the deployer has 30 seconds before it will error out. + // On context done, the deployer has limited time before it will error out. go func() { + defer r.wg.Done() result := r.atpClient.Execute( - schema.Input{RunID: r.runID, ID: r.pluginStepID, InputData: runInput}, + schema.Input{RunID: r.runID, ID: r.pluginStepID, InputData: runInput.stepInputData}, r.signalToStep, r.signalFromStep, ) + r.lock.Lock() + // The sender should be the one to close the signal send channel + channel := r.signalToStep + r.signalToStep = nil + close(channel) + r.lock.Unlock() r.executionChannel <- result if err = r.atpClient.Close(); err != nil { r.logger.Warningf("Error while closing ATP client: %s", err) } }() - return nil + return false, runInput.forceCloseTimeoutMS, nil } -func (r *runningStep) runStage() error { +func (r *runningStep) runStage(forceCloseTimeoutMS int64) error { r.logger.Debugf("Running stage for step %s/%s", r.runID, r.pluginStepID) startedOutput := any(map[any]any{}) r.transitionStageWithOutput(StageIDRunning, step.RunningStepStateRunning, schema.PointerTo("started"), &startedOutput) @@ -1142,36 +1363,73 @@ func (r *runningStep) runStage() error { var result atp.ExecutionResult select { case result = <-r.executionChannel: - if result.Error != nil { - return result.Error - } case <-r.ctx.Done(): - // In this case, it is being instructed to stop. A signal should have been sent. - // Shutdown (with sigterm) the container, then wait for the output (valid or error). - r.logger.Debugf("Got step context done before step run complete. Waiting up to 30 seconds for result.") + // In this case, it is being instructed to stop. A cancellation signal should be sent if supported. + if r.hasCancellationHandler() { + r.logger.Debugf("Got step context done before step run complete. Sending cancellation signal. Waiting up to %d milliseconds for result.", forceCloseTimeoutMS) + r.lock.Lock() + r.cancelStep() + r.lock.Unlock() + } else { + r.logger.Warningf("Got step context done before step run complete. Force closing step %s/%s.", r.runID, r.pluginStepID) + err := r.forceCloseInternal() + return fmt.Errorf("step forced closed after timeout without result (%w)", err) + } + // Wait for cancellation to occur. select { case result = <-r.executionChannel: // Successfully stopped before end of timeout. - case <-time.After(time.Duration(30) * time.Second): - r.logger.Warningf("Step %s/%s did not complete within the 30 second time limit. Force closing container.", - r.runID, r.pluginStepID) - if err := r.ForceClose(); err != nil { - r.logger.Warningf("Error in step %s/%s while closing plugin container (%w)", r.runID, r.pluginStepID, err) + case <-time.After(time.Duration(forceCloseTimeoutMS) * time.Millisecond): + r.logger.Warningf("Cancelled step %s/%s did not complete within the %d millisecond time limit. Force closing container.", + r.runID, r.pluginStepID, forceCloseTimeoutMS) + if err := r.forceCloseInternal(); err != nil { + r.logger.Warningf("Error in step %s/%s while closing plugin container (%s)", r.runID, r.pluginStepID, err.Error()) + return fmt.Errorf("step closed after timeout without result with error (%s)", err.Error()) } + return fmt.Errorf("step closed after timeout without result") } + } + if result.Error != nil { + return result.Error } // Execution complete, move to state running stage outputs, then to state finished stage. - r.transitionStage(StageIDOutput, step.RunningStepStateRunning) + r.transitionRunningStage(StageIDOutput) r.completeStep(r.currentStage, step.RunningStepStateFinished, &result.OutputID, &result.OutputData) return nil } +func (r *runningStep) markStageFailures(firstStage StageID, err error) { + switch firstStage { + case StageIDEnabling: + r.stageChangeHandler.OnStepStageFailure(r, string(StageIDEnabling), &r.wg, err) + fallthrough + case StageIDDisabled: + r.stageChangeHandler.OnStepStageFailure(r, string(StageIDDisabled), &r.wg, err) + fallthrough + case StageIDStarting: + r.stageChangeHandler.OnStepStageFailure(r, string(StageIDStarting), &r.wg, err) + fallthrough + case StageIDRunning: + r.stageChangeHandler.OnStepStageFailure(r, string(StageIDRunning), &r.wg, err) + fallthrough + case StageIDOutput: + r.stageChangeHandler.OnStepStageFailure(r, string(StageIDOutput), &r.wg, err) + default: + panic("unknown StageID") + } +} + +// Closable is the graceful case, so this is necessary if it crashes. +func (r *runningStep) markNotClosable(err error) { + r.stageChangeHandler.OnStepStageFailure(r, string(StageIDClosed), &r.wg, err) +} + func (r *runningStep) deployFailed(err error) { r.logger.Debugf("Deploy failed stage for step %s/%s", r.runID, r.pluginStepID) - r.transitionStage(StageIDDeployFailed, step.RunningStepStateRunning) + r.transitionRunningStage(StageIDDeployFailed) r.logger.Warningf("Plugin step %s/%s deploy failed. %v", r.runID, r.pluginStepID, err) // Now it's done. @@ -1180,14 +1438,10 @@ func (r *runningStep) deployFailed(err error) { Error: err.Error(), }) r.completeStep(StageIDDeployFailed, step.RunningStepStateFinished, &outputID, &output) -} - -func (r *runningStep) transitionToCancelled() { - r.logger.Infof("Step %s/%s cancelled", r.runID, r.pluginStepID) - // Follow the convention of transitioning to running then finished. - r.transitionStage(StageIDCancelled, step.RunningStepStateRunning) - // Cancelled currently has no output. - r.transitionStage(StageIDCancelled, step.RunningStepStateFinished) + // If deployment fails, enabling, disabled, starting, running, and output cannot occur. + err = fmt.Errorf("deployment failed for step %s/%s", r.runID, r.pluginStepID) + r.markStageFailures(StageIDEnabling, err) + r.markNotClosable(err) } func (r *runningStep) transitionToDisabled() { @@ -1207,11 +1461,35 @@ func (r *runningStep) transitionToDisabled() { schema.PointerTo("output"), &disabledOutput, ) + + err := fmt.Errorf("step %s/%s disabled", r.runID, r.pluginStepID) + r.markStageFailures(StageIDStarting, err) + r.markNotClosable(err) +} + +func (r *runningStep) closedEarly(stageToMarkUnresolvable StageID, priorStageFailed bool) { + r.logger.Infof("Step %s/%s closed", r.runID, r.pluginStepID) + // Follow the convention of transitioning to running then finished. + if priorStageFailed { + r.transitionFromFailedStage(StageIDClosed, step.RunningStepStateRunning, fmt.Errorf("step closed early")) + } else { + r.transitionRunningStage(StageIDClosed) + } + closedOutput := any(map[any]any{"cancelled": r.cancelled, "close_requested": r.closed.Load()}) + + r.completeStep( + StageIDClosed, + step.RunningStepStateFinished, + schema.PointerTo("result"), + &closedOutput, + ) + + err := fmt.Errorf("step %s/%s closed due to workflow termination", r.runID, r.pluginStepID) + r.markStageFailures(stageToMarkUnresolvable, err) } func (r *runningStep) startFailed(err error) { - r.logger.Debugf("Start failed stage for step %s/%s", r.runID, r.pluginStepID) - r.transitionStage(StageIDCrashed, step.RunningStepStateRunning) + r.transitionFromFailedStage(StageIDCrashed, step.RunningStepStateRunning, err) r.logger.Warningf("Plugin step %s/%s start failed. %v", r.runID, r.pluginStepID, err) // Now it's done. @@ -1221,11 +1499,12 @@ func (r *runningStep) startFailed(err error) { }) r.completeStep(StageIDCrashed, step.RunningStepStateFinished, &outputID, &output) + r.markStageFailures(StageIDRunning, err) + r.markNotClosable(err) } func (r *runningStep) runFailed(err error) { - r.logger.Debugf("Run failed stage for step %s/%s", r.runID, r.pluginStepID) - r.transitionStage(StageIDCrashed, step.RunningStepStateRunning) + r.transitionRunningStage(StageIDCrashed) r.logger.Warningf("Plugin step %s/%s run failed. %v", r.runID, r.pluginStepID, err) // Now it's done. @@ -1234,15 +1513,39 @@ func (r *runningStep) runFailed(err error) { Output: err.Error(), }) r.completeStep(StageIDCrashed, step.RunningStepStateFinished, &outputID, &output) + r.markStageFailures(StageIDOutput, err) + r.markNotClosable(err) } -// TransitionStage transitions the stage to the specified stage, and the state to the specified state. -func (r *runningStep) transitionStage(newStage StageID, state step.RunningStepState) { - r.transitionStageWithOutput(newStage, state, nil, nil) +// TransitionStage transitions the running step to the specified stage, and the state running. +// For other situations, use transitionFromFailedStage, completeStep, or transitionStageWithOutput. +func (r *runningStep) transitionRunningStage(newStage StageID) { + r.transitionStageWithOutput(newStage, step.RunningStepStateRunning, nil, nil) +} + +func (r *runningStep) transitionFromFailedStage(newStage StageID, state step.RunningStepState, err error) { + r.lock.Lock() + previousStage := string(r.currentStage) + r.currentStage = newStage + // Don't forget to update this, or else it will behave very oddly. + // First running, then finished. You can't skip states. + r.state = state + r.lock.Unlock() + r.stageChangeHandler.OnStepStageFailure( + r, + previousStage, + &r.wg, + err, + ) } // TransitionStage transitions the stage to the specified stage, and the state to the specified state. -func (r *runningStep) transitionStageWithOutput(newStage StageID, state step.RunningStepState, outputID *string, previousStageOutput *any) { +func (r *runningStep) transitionStageWithOutput( + newStage StageID, + state step.RunningStepState, + outputID *string, + previousStageOutput *any, +) { // A current lack of observability into the atp client prevents // non-fragile testing of this function. r.lock.Lock() diff --git a/internal/step/plugin/provider_test.go b/internal/step/plugin/provider_test.go index 84a523e0..e946f000 100644 --- a/internal/step/plugin/provider_test.go +++ b/internal/step/plugin/provider_test.go @@ -46,6 +46,10 @@ func (s *deployFailStageChangeHandler) OnStepComplete( s.message <- message } +func (s *deployFailStageChangeHandler) OnStepStageFailure(_ step.RunningStep, _ string, _ *sync.WaitGroup, _ error) { + +} + type startFailStageChangeHandler struct { message chan string } @@ -79,6 +83,10 @@ func (s *startFailStageChangeHandler) OnStepComplete( s.message <- message } +func (s *startFailStageChangeHandler) OnStepStageFailure(_ step.RunningStep, _ string, _ *sync.WaitGroup, _ error) { + +} + type stageChangeHandler struct { message chan string } @@ -111,6 +119,10 @@ func (s *stageChangeHandler) OnStepComplete( s.message <- message } +func (s *stageChangeHandler) OnStepStageFailure(_ step.RunningStep, _ string, _ *sync.WaitGroup, _ error) { + +} + func TestProvider_MissingDeployer(t *testing.T) { logger := log.New( log.Config{ diff --git a/internal/step/provider.go b/internal/step/provider.go index b8722486..5e45a63c 100644 --- a/internal/step/provider.go +++ b/internal/step/provider.go @@ -56,6 +56,15 @@ type StageChangeHandler interface { previousStageOutput *any, wg *sync.WaitGroup, ) + + // OnStepStageFailure is called when it becomes known that the step's stage will not produce an output. + // The error is optional. + OnStepStageFailure( + step RunningStep, + stage string, + wg *sync.WaitGroup, + err error, + ) } // RunnableStep is a step that already has a schema and can be run. diff --git a/workflow/executor.go b/workflow/executor.go index b45578ea..ce446ef8 100644 --- a/workflow/executor.go +++ b/workflow/executor.go @@ -403,6 +403,8 @@ func addScopesWithReferences(allNamespaces map[string]map[string]*schema.ObjectS } // connectStepDependencies connects the steps based on their expressions. +// +//nolint:gocognit func (e *executor) connectStepDependencies( workflow *Workflow, workflowContext map[string][]byte, @@ -418,8 +420,12 @@ func (e *executor) connectStepDependencies( if err != nil { return fmt.Errorf("bug: node for current stage not found (%w)", err) } - for _, nextStage := range stage.NextStages { - if err := currentStageNode.Connect(GetStageNodeID(stepID, nextStage)); err != nil { + for nextStage, dependencyType := range stage.NextStages { + nextStageNode, err := dag.GetNodeByID(GetStageNodeID(stepID, nextStage)) + if err != nil { + return fmt.Errorf("bug: node for next stage not found (%w)", err) + } + if err := nextStageNode.ConnectDependency(currentStageNode.ID(), dependencyType); err != nil { return fmt.Errorf("bug: cannot connect nodes (%w)", err) } } diff --git a/workflow/model.go b/workflow/model.go index 54ccba15..d5e3adc4 100644 --- a/workflow/model.go +++ b/workflow/model.go @@ -2,12 +2,10 @@ package workflow import ( "fmt" - "regexp" - "strings" - "go.arcalot.io/dgraph" "go.flow.arcalot.io/engine/internal/step" "go.flow.arcalot.io/pluginsdk/schema" + "regexp" ) // Workflow is the primary data structure describing workflows. @@ -229,30 +227,23 @@ type ErrNoMorePossibleSteps struct { // Error returns an explanation on why the error happened. func (e ErrNoMorePossibleSteps) Error() string { - var outputsUnmetDependencies []string //nolint:prealloc - for _, node := range e.dag.ListNodes() { - if node.Item().Kind != DAGItemKindOutput { - continue - } - var unmetDependencies []string - inbound, err := node.ListInboundConnections() - if err != nil { - panic(fmt.Errorf("failed to fetch output node inbound dependencies (%w)", err)) - } - for i := range inbound { - unmetDependencies = append(unmetDependencies, i) - } - outputsUnmetDependencies = append( - outputsUnmetDependencies, - fmt.Sprintf("%s: %s", node.Item().OutputID, strings.Join(unmetDependencies, ", ")), - ) - } return fmt.Sprintf( - "no steps running, no more executable steps, cannot construct any output (outputs have the following dependencies: %s)", - strings.Join(outputsUnmetDependencies, "; "), + "no steps running, no more executable steps; cannot construct any output." + + " this is the fallback system, indicating a failure of the output resolution system", ) } +// ErrNoMorePossibleOutputs indicates that the workflow has terminated due to it being impossible to resolve an output. +// This means that steps that the output(s) depended on did not have the required results. +type ErrNoMorePossibleOutputs struct { + dag dgraph.DirectedGraph[*DAGItem] +} + +// Error returns an explanation on why the error happened. +func (e ErrNoMorePossibleOutputs) Error() string { + return "all outputs marked as unresolvable" +} + // ErrInvalidState indicates that the workflow failed due to an invalid state. type ErrInvalidState struct { processingSteps int diff --git a/workflow/model_test.go b/workflow/model_test.go index 03556be5..df34f3b7 100644 --- a/workflow/model_test.go +++ b/workflow/model_test.go @@ -6,7 +6,7 @@ import ( "testing" ) -var versionExp = "v0.2.0" +var testVersionExp = "v0.2.0" var inputExp = map[string]any{ "root": "RootObject", "objects": map[string]any{ @@ -24,9 +24,9 @@ var stepsExp = map[string]any{ "input": map[string]any{ "wait_time_ms": 1}}, } -var outputID = "success" +var testExpectedOutputID = "success" var outputsExp = map[string]any{ - outputID: "!expr $.steps.long_wait.outputs", + testExpectedOutputID: "!expr $.steps.long_wait.outputs", } var outputSchemaRootID = "RootObjectOut" var stepOutputSchemaInput = map[string]any{ @@ -42,10 +42,10 @@ var stepOutputSchemaInput = map[string]any{ }}}}}}, } var outputSchemaInput = map[string]any{ - outputID: stepOutputSchemaInput, + testExpectedOutputID: stepOutputSchemaInput, } var workflowSchemaInput = map[string]any{ - "version": versionExp, + "version": testVersionExp, "input": inputExp, "steps": stepsExp, "outputs": outputsExp, diff --git a/workflow/workflow.go b/workflow/workflow.go index 7fb0bef7..9a0a0a33 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -82,6 +82,12 @@ func (e *executableWorkflow) Execute(ctx context.Context, serializedInput any) ( // We use an internal cancel function to abort the workflow if something bad happens. ctx, cancel := context.WithCancel(ctx) defer cancel() + outputNodes := make(map[string]dgraph.Node[*DAGItem]) + for _, node := range e.dag.ListNodes() { + if node.Item().Kind == DAGItemKindOutput { + outputNodes[node.ID()] = node + } + } l := &loopState{ logger: e.logger.WithLabel("source", "workflow"), @@ -96,9 +102,12 @@ func (e *executableWorkflow) Execute(ctx context.Context, serializedInput any) ( runningSteps: make(map[string]step.RunningStep, len(e.dag.ListNodes())), outputDataChannel: make(chan outputDataType, 1), outputDone: false, + waitingOutputs: outputNodes, + context: ctx, cancel: cancel, workflowContext: e.workflowContext, recentErrors: make(chan error, 20), // Big buffer in case there are lots of subsequent errors. + lifecycles: e.lifecycles, } l.lock.Lock() @@ -133,9 +142,8 @@ func (e *executableWorkflow) Execute(ctx context.Context, serializedInput any) ( if !inputAvailable { waitingForInputText = " and is waiting for input" } - e.logger.Debugf("START Stage change for step %s to %s%s...", stepID, stage, waitingForInputText) + e.logger.Debugf("Stage change for step %s to %s%s...", stepID, stage, waitingForInputText) l.onStageComplete(stepID, previousStage, previousStageOutputID, previousStageOutput, wg) - e.logger.Debugf("DONE Stage change for step %s to %s%s...", stepID, stage, waitingForInputText) }, onStepComplete: func( step step.RunningStep, @@ -151,6 +159,18 @@ func (e *executableWorkflow) Execute(ctx context.Context, serializedInput any) ( } l.onStageComplete(stepID, &previousStage, previousStageOutputID, previousStageOutput, wg) }, + onStepStageFailure: func(step step.RunningStep, stage string, wg *sync.WaitGroup, err error) { + if err == nil { + e.logger.Debugf("Step %q stage %q declared that it will not produce an output", stepID, stage) + } else { + e.logger.Debugf("Step %q stage %q declared that it will not produce an output (%s)", stepID, stage, err.Error()) + } + l.lock.Lock() + defer l.lock.Unlock() + l.markOutputsUnresolvable(stepID, stage, nil) + l.markStageNodeUnresolvable(stepID, stage) + l.notifySteps() + }, } e.logger.Debugf("Launching step %s...", stepID) runningStep, err := runnableStep.Start(e.stepRunData[stepID], stepID, stageHandler) @@ -161,15 +181,7 @@ func (e *executableWorkflow) Execute(ctx context.Context, serializedInput any) ( } l.lock.Unlock() // Let's make sure we are closing all steps once this function terminates so we don't leave stuff running. - defer func() { - e.logger.Debugf("Terminating all steps...") - for stepID, runningStep := range l.runningSteps { - e.logger.Debugf("Terminating step %s...", stepID) - if err := runningStep.ForceClose(); err != nil { - panic(fmt.Errorf("failed to close step %s (%w)", stepID, err)) - } - } - }() + defer l.terminateAllSteps() // We remove the input node from the DAG and call the notifySteps function once to trigger the workflow // start. @@ -197,36 +209,80 @@ func (e *executableWorkflow) Execute(ctx context.Context, serializedInput any) ( select { case outputDataEntry, ok := <-l.outputDataChannel: if !ok { - return "", nil, fmt.Errorf("output data channel unexpectedly closed") + lastErrors := l.handleErrors() + return "", nil, + fmt.Errorf("output data channel unexpectedly closed. %w", lastErrors) } - e.logger.Debugf("Output complete.") - outputID = outputDataEntry.outputID - outputData = outputDataEntry.outputData - outputSchema, ok := e.outputSchema[outputID] - if !ok { - return "", nil, fmt.Errorf( - "bug: no output named '%s' found in output schema", - outputID, - ) - } - _, err := outputSchema.Unserialize(outputDataEntry.outputData) - if err != nil { - return "", nil, fmt.Errorf( - "bug: output schema cannot unserialize output data (%w)", - err, - ) - } - return outputDataEntry.outputID, outputData, nil + return e.handleOutput(l, outputDataEntry) case <-ctx.Done(): - lastErr := l.getLastError() - e.logger.Debugf("Workflow execution aborted. %s", lastErr) - if lastErr != nil { - return "", nil, lastErr + lastErrors := l.handleErrors() + if lastErrors == nil { + e.logger.Warningf( + "Workflow execution aborted. Waiting for output before terminating (%w)", + lastErrors) + } else { + e.logger.Debugf("Workflow execution exited with error after context done") + return "", nil, lastErrors } - return "", nil, fmt.Errorf("workflow execution aborted (%w)", ctx.Err()) + timedContext, cancelFunction := context.WithTimeout(context.Background(), 5*time.Second) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + l.terminateAllSteps() + }() + defer wg.Wait() + defer cancelFunction() + select { + case outputDataEntry, ok := <-l.outputDataChannel: + if !ok { + lastErrors := l.handleErrors() + return "", nil, + fmt.Errorf( + "output data channel unexpectedly closed while waiting after execution aborted (%w)", + lastErrors) + } + return e.handleOutput(l, outputDataEntry) + case err := <-l.recentErrors: // The context is done, so instead just check for errors. + // Put it back in the channel + l.recentErrors <- err + lastErrors := l.handleErrors() + l.logger.Errorf("workflow failed with error %s", err.Error()) + return "", nil, lastErrors + case <-timedContext.Done(): + lastErrors := l.handleErrors() + return "", nil, fmt.Errorf("workflow execution aborted (%w) (%s)", ctx.Err(), lastErrors.Error()) + } + } } +func (e *executableWorkflow) handleOutput(l *loopState, outputDataEntry outputDataType) (outputID string, outputData any, err error) { + lastErrors := l.handleErrors() + if lastErrors != nil { + e.logger.Warningf("output completed with errors (%s)", lastErrors.Error()) + } else { + e.logger.Debugf("output complete with output ID %s", outputDataEntry.outputID) + } + outputID = outputDataEntry.outputID + outputData = outputDataEntry.outputData + outputSchema, ok := e.outputSchema[outputID] + if !ok { + return "", nil, fmt.Errorf( + "bug: no output named '%s' found in output schema (%w)", + outputID, lastErrors, + ) + } + _, err = outputSchema.Unserialize(outputDataEntry.outputData) + if err != nil { + return "", nil, fmt.Errorf( + "bug: output schema cannot unserialize output data (%s) (%w)", + err.Error(), lastErrors, + ) + } + return outputDataEntry.outputID, outputData, nil +} + type outputDataType struct { outputID string outputData any @@ -245,35 +301,72 @@ type loopState struct { runningSteps map[string]step.RunningStep outputDataChannel chan outputDataType outputDone bool - recentErrors chan error - cancel context.CancelFunc - workflowContext map[string][]byte + // waitingOutputs keeps track of all workflow output nodes to know when the workflow fails. + waitingOutputs map[string]dgraph.Node[*DAGItem] + context context.Context + recentErrors chan error + cancel context.CancelFunc + workflowContext map[string][]byte + lifecycles map[string]step.Lifecycle[step.LifecycleStageWithSchema] +} + +func (l *loopState) terminateAllSteps() { + l.logger.Debugf("Terminating all steps...") + for stepID, runningStep := range l.runningSteps { + l.logger.Debugf("Terminating step %s...", stepID) + if err := runningStep.ForceClose(); err != nil { + panic(fmt.Errorf("failed to close step %s (%w)", stepID, err)) + } + } } -// getLastError gathers the last errors. If there are several, it creates a new one that consolidates them. +// getLastError gathers the last errors. If there are several, it creates a new one that consolidates +// all non-duplicate ones. // This will read from the channel. Calling again will only gather new errors since the last call. func (l *loopState) getLastError() error { - var errors []error + errors := map[string]error{} errGatherLoop: for { select { case err := <-l.recentErrors: - errors = append(errors, err) + errors[err.Error()] = err default: break errGatherLoop // No more errors } } switch len(errors) { case 0: - return nil + fallthrough case 1: - return errors[0] + for _, err := range errors { + return err + } + return nil default: - return fmt.Errorf("multiple errors: %v", errors) + errorsAsString := "" + for errStr := range errors { + errorsAsString += " " + errStr + } + return fmt.Errorf("multiple errors:%s", errorsAsString) } } -func (l *loopState) onStageComplete(stepID string, previousStage *string, previousStageOutputID *string, previousStageOutput *any, wg *sync.WaitGroup) { +func (l *loopState) handleErrors() error { + lastErr := l.getLastError() + if lastErr != nil { + l.logger.Warningf("Workflow execution aborted with error: %s", lastErr.Error()) + return lastErr + } + return nil +} + +func (l *loopState) onStageComplete( + stepID string, + previousStage *string, + previousStageOutputID *string, + previousStageOutput *any, + wg *sync.WaitGroup, +) { l.lock.Lock() defer func() { if previousStage != nil { @@ -292,10 +385,10 @@ func (l *loopState) onStageComplete(stepID string, previousStage *string, previo l.cancel() return } - l.logger.Debugf("Resolving node %q in the DAG", stageNode.ID()) + l.logger.Debugf("Resolving node %q in the DAG on stage complete", stageNode.ID()) if err := stageNode.ResolveNode(dgraph.Resolved); err != nil { - l.logger.Errorf("Failed to resolve stage node ID %s (%w)", stageNode.ID(), err) - l.recentErrors <- fmt.Errorf("failed to resolve stage node ID %s (%w)", stageNode.ID(), err) + errMessage := fmt.Errorf("failed to resolve stage node ID %s (%s)", stageNode.ID(), err.Error()) + l.recentErrors <- errMessage l.cancel() return } @@ -309,7 +402,7 @@ func (l *loopState) onStageComplete(stepID string, previousStage *string, previo } // Resolves the node in the DAG. This allows us to know which nodes are // ready for processing due to all dependencies being resolved. - l.logger.Debugf("Resolving node %q in the DAG", outputNode.ID()) + l.logger.Debugf("Resolving output node %q in the DAG", outputNode.ID()) if err := outputNode.ResolveNode(dgraph.Resolved); err != nil { l.logger.Errorf("Failed to resolve output node ID %s (%w)", outputNode.ID(), err) l.recentErrors <- fmt.Errorf("failed to resolve output node ID %s (%w)", outputNode.ID(), err) @@ -327,6 +420,9 @@ func (l *loopState) onStageComplete(stepID string, previousStage *string, previo *previousStageOutput, ) } + // Mark all alternative output ID nodes as unresolvable. + // Use the lifecycle to find the possible output IDs + l.markOutputsUnresolvable(stepID, *previousStage, previousStageOutputID) // Placing data from the output into the general data structure l.data[WorkflowStepsKey].(map[string]any)[stepID].(map[string]any)[*previousStage] = map[string]any{} @@ -335,6 +431,57 @@ func (l *loopState) onStageComplete(stepID string, previousStage *string, previo l.notifySteps() } +// Marks the outputs of that stage unresolvable. +// Optionally skip an output that should not unresolve. +// Does not (un)resolve the non-output node for that stage. +// To prevent a deadlock, `notifySteps()` should be called at some point after this is called. +func (l *loopState) markOutputsUnresolvable(stepID string, stageID string, skippedOutput *string) { + stages := l.lifecycles[stepID].Stages + for _, stage := range stages { + if stage.ID != stageID { + continue + } + for stageOutputID := range stage.Outputs { + if skippedOutput != nil && stageOutputID == *skippedOutput { + continue + } + unresolvableOutputNode, err := l.dag.GetNodeByID(GetOutputNodeID(stepID, stage.ID, stageOutputID)) + if err != nil { + l.logger.Warningf("Could not get DAG node %s (%s)", stepID+"."+stage.ID+"."+stageOutputID, err.Error()) + continue + } + l.logger.Debugf("Will mark node %s in the DAG as unresolvable", stepID+"."+stage.ID+"."+stageOutputID) + err = unresolvableOutputNode.ResolveNode(dgraph.Unresolvable) + if err != nil { + panic(fmt.Errorf("error while marking node %s in DAG as unresolvable (%s)", unresolvableOutputNode.ID(), err.Error())) + } + } + } +} + +// Marks the stage node for the step's stage as unresolvable. +// Does not (un)resolve the outputs of that node. For that, call markOutputsUnresolvable() instead or +// in addition to calling this function. +// To prevent a deadlock, `notifySteps()` should be called at some point after this is called. +func (l *loopState) markStageNodeUnresolvable(stepID string, stageID string) { + unresolvableOutputNode, err := l.dag.GetNodeByID(GetStageNodeID(stepID, stageID)) + if err != nil { + l.logger.Warningf("Could not get DAG node %s (%s)", stepID+"."+stageID, err.Error()) + return + } + l.logger.Debugf("Will mark node %s in the DAG as unresolvable", stepID+"."+stageID) + err = unresolvableOutputNode.ResolveNode(dgraph.Unresolvable) + if err != nil { + panic( + fmt.Errorf( + "error while marking node %s in DAG as unresolvable (%s)", + unresolvableOutputNode.ID(), + err.Error(), + ), + ) + } +} + // notifySteps is a function we can call to go through all DAG nodes that are marked // ready and provides step inputs based on expressions. // The lock should be acquired by the caller before this is called. @@ -342,28 +489,45 @@ func (l *loopState) notifySteps() { //nolint:gocognit readyNodes := l.dag.PopReadyNodes() l.logger.Debugf("Currently %d DAG nodes are ready. Now processing them.", len(readyNodes)) - // Can include runnable nodes, nodes that cannot be resolved, and nodes that are not for running, like an input. + // Can include runnable nodes, nodes that cannot be resolved, and nodes that are not for running, like inputs. for nodeID, resolutionStatus := range readyNodes { - if resolutionStatus == dgraph.Unresolvable { - l.logger.Debugf("Disregarding node %q with resolution %q", nodeID, resolutionStatus) - continue - } - l.logger.Debugf("Processing step node %s", nodeID) + failed := resolutionStatus == dgraph.Unresolvable + l.logger.Debugf("Processing step node %s with resolution status %q", nodeID, resolutionStatus) node, err := l.dag.GetNodeByID(nodeID) if err != nil { panic(fmt.Errorf("failed to get node %s (%w)", nodeID, err)) } nodeItem := node.Item() + if failed { + if nodeItem.Kind == DAGItemKindOutput { + l.logger.Debugf("Output node %s failed", nodeID) + // Check to see if there are any remaining output nodes, and if there aren't, + // cancel the context. + delete(l.waitingOutputs, nodeID) + if len(l.waitingOutputs) == 0 && !l.outputDone { + l.recentErrors <- &ErrNoMorePossibleOutputs{ + l.dag, + } + l.cancel() + } + } else { + l.logger.Debugf("Disregarding failed node %s with type %s", nodeID, nodeItem.Kind) + } + continue + } // The data structure that the particular node requires. One or more fields. May or may not contain expressions. inputData := nodeItem.Data if inputData == nil { // No input data is needed. This is often the case for input nodes. continue } + // Resolve any expressions in the input data. // untypedInputData stores the resolved data untypedInputData, err := l.resolveExpressions(inputData, l.data) if err != nil { + // An error here often indicates a locking issue in a step provider. This could be caused + // by the lock not being held when the output was marked resolved. panic(fmt.Errorf("cannot resolve expressions for %s (%w)", nodeID, err)) } @@ -478,6 +642,7 @@ func (l *loopState) checkForDeadlocks(retries int, wg *sync.WaitGroup) { l.dag, } l.logger.Debugf("DAG:\n%s", l.dag.Mermaid()) + l.logger.Errorf("TERMINATING WORKFLOW; Errors below this error may be due to the early termination") l.cancel() } else { // Retry. There are times when all the steps are in a transition state. @@ -485,9 +650,16 @@ func (l *loopState) checkForDeadlocks(retries int, wg *sync.WaitGroup) { l.logger.Warningf("No running steps. Rechecking...") wg.Add(1) go func() { - time.Sleep(5 * time.Millisecond) - l.checkForDeadlocks(retries-1, wg) - wg.Done() + defer wg.Done() + select { + case <-time.After(time.Duration(5) * time.Millisecond): + time.Sleep(5 * time.Millisecond) + l.lock.Lock() + l.checkForDeadlocks(retries-1, wg) + l.lock.Unlock() + case <-l.context.Done(): + return + } }() } } @@ -548,6 +720,12 @@ type stageChangeHandler struct { previousStageOutput *any, wg *sync.WaitGroup, ) + onStepStageFailure func( + step step.RunningStep, + stage string, + wg *sync.WaitGroup, + err error, + ) } func (s stageChangeHandler) OnStageChange( @@ -572,6 +750,15 @@ func (s stageChangeHandler) OnStepComplete( s.onStepComplete(step, previousStage, previousStageOutputID, previousStageOutput, wg) } +func (s stageChangeHandler) OnStepStageFailure( + step step.RunningStep, + stage string, + wg *sync.WaitGroup, + err error, +) { + s.onStepStageFailure(step, stage, wg, err) +} + // PrintObjectNamespaceTable constructs and writes a tidy table of workflow // Objects and their namespaces to the given output destination. func PrintObjectNamespaceTable(output io.Writer, allNamespaces map[string]map[string]*schema.ObjectSchema, logger log.Logger) { diff --git a/workflow/workflow_test.go b/workflow/workflow_test.go index 8c9f1527..8ba4053d 100644 --- a/workflow/workflow_test.go +++ b/workflow/workflow_test.go @@ -52,12 +52,44 @@ func TestOutputFailed(t *testing.T) { _, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{"name": "Arca Lot"}) assert.Nil(t, outputData) assert.Error(t, err) - var typedError *workflow.ErrNoMorePossibleSteps + + var typedError *workflow.ErrNoMorePossibleOutputs if !errors.As(err, &typedError) { - t.Fatalf("incorrect error type returned: %T", err) + t.Fatalf("incorrect error type returned: %T (%s)", err, err) } } +var simpleValidLiteralInputWaitWorkflowDefinition = ` +version: v0.2.0 +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: {} +steps: + wait_1: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: 0 +outputs: + a: + b: !expr $.steps.wait_1.outputs +` + +func TestSimpleValidWaitWorkflow(t *testing.T) { + // Just a single wait + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getTestImplPreparedWorkflow(t, simpleValidLiteralInputWaitWorkflowDefinition), + ) + outputID, _, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) + assert.NoError(t, err) + assert.Equals(t, outputID, "a") +} + var stepCancellationWorkflowDefinition = ` version: v0.2.0 input: @@ -83,7 +115,7 @@ steps: input: # It needs to be long enough for it to ensure that long_wait is in a running state. # The other case will be tested separately. - wait_time_ms: 20 + wait_time_ms: 5 outputs: a: cancelled_step_output: !expr $.steps.long_wait.outputs @@ -123,7 +155,7 @@ steps: deployment_type: "builtin" step: wait input: - wait_time_ms: 80 + wait_time_ms: 10 # Delay needs to be delayed long enough to ensure that last_step isn't running when it's cancelled by short_wait delay: plugin: @@ -131,7 +163,7 @@ steps: deployment_type: "builtin" step: wait input: - wait_time_ms: 50 + wait_time_ms: 5 last_step: plugin: src: "n/a" @@ -197,20 +229,20 @@ steps: deployment_type: "builtin" step: wait input: - wait_time_ms: 100 + wait_time_ms: 20 step_to_cancel: plugin: src: "n/a" deployment_type: "builtin" step: wait input: - wait_time_ms: 0 + wait_time_ms: 1000 # You can verify that this test works by commenting out this line. It should fail. stop_if: !expr $.steps.short_wait.outputs # Delay needs to be delayed long enough to ensure that it's in a deploy state when it's cancelled by short_wait deploy: deployer_name: "test-impl" - deploy_time: 50 # 50 ms + deploy_time: 20 # ms short_wait: plugin: src: "n/a" @@ -248,37 +280,6 @@ func TestDeploymentStepCancellation(t *testing.T) { assert.LessThan(t, duration.Milliseconds(), 200) } -var simpleValidLiteralInputWaitWorkflowDefinition = ` -version: v0.2.0 -input: - root: RootObject - objects: - RootObject: - id: RootObject - properties: {} -steps: - wait_1: - plugin: - src: "n/a" - deployment_type: "builtin" - step: wait - input: - wait_time_ms: 0 -outputs: - a: - b: !expr $.steps.wait_1.outputs -` - -func TestSimpleValidWaitWorkflow(t *testing.T) { - // Just a single wait - preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( - getTestImplPreparedWorkflow(t, simpleValidLiteralInputWaitWorkflowDefinition), - ) - outputID, _, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) - assert.NoError(t, err) - assert.Equals(t, outputID, "a") -} - func TestWithDoubleSerializationDetection(t *testing.T) { preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( getTestImplPreparedWorkflow(t, simpleValidLiteralInputWaitWorkflowDefinition), @@ -307,7 +308,7 @@ func TestWithDoubleSerializationDetection(t *testing.T) { map[string]any{}, }, } - for _, i := range testIter { + for _, testData := range testIter { errorDetect := util.NewInvalidSerializationDetectorSchema() // Inject the error detector into the object rootObject.PropertiesValue["error_detector"] = schema.NewPropertySchema( @@ -317,10 +318,10 @@ func TestWithDoubleSerializationDetection(t *testing.T) { nil, nil, nil, - i.defaultSpec, + testData.defaultSpec, nil, ) - outputID, _, err := preparedWorkflow.Execute(context.Background(), i.input) + outputID, _, err := preparedWorkflow.Execute(context.Background(), testData.input) assert.NoError(t, err) assert.Equals(t, outputID, "a") // Confirm that, while we did no double-unserializations or double-serializations, @@ -361,7 +362,7 @@ outputs: ` func TestWaitForSerial(t *testing.T) { - // For this test, a workflow runs two steps, where each step runs a wait step for 5s + // For this test, a workflow runs two steps, where each step runs a wait step for 10ms // The second wait step waits for the first to succeed after which it runs // Due to the wait for condition, the steps will execute serially // The total execution time for this test function should be greater than 10seconds @@ -524,7 +525,7 @@ steps: wait_time_ms: !expr $.input.wait_time_ms outputs: success: - b: !expr $.steps.wait_1.outputs + b: !expr $.steps.wait_1.outputs ` func TestWaitForSerial_Foreach(t *testing.T) { @@ -538,7 +539,7 @@ func TestWaitForSerial_Foreach(t *testing.T) { // run serially. logConfig := log.Config{ - Level: log.LevelInfo, + Level: log.LevelDebug, Destination: log.DestinationStdout, } logger := log.New( @@ -752,7 +753,12 @@ func TestMissingInputsFailedDeployment(t *testing.T) { ) outputID, _, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) assert.Error(t, err) + t.Logf("Test output: %s", err.Error()) assert.Equals(t, outputID, "") + var typedError *workflow.ErrNoMorePossibleOutputs + if !errors.As(err, &typedError) { + t.Fatalf("incorrect error type returned: %T (%s)", err, err) + } } var missingInputsWrongOutputWorkflowDefinition = ` @@ -796,7 +802,7 @@ func TestMissingInputsWrongOutput(t *testing.T) { assert.Equals(t, outputID, "") } -var fiveSecWaitWorkflowDefinition = ` +var fourSecWaitWorkflowDefinition = ` version: v0.2.0 input: root: RootObject @@ -810,23 +816,19 @@ steps: src: "n/a" deployment_type: "builtin" step: wait + closure_wait_timeout: 5000 input: - wait_time_ms: 5000 + wait_time_ms: 4000 outputs: success: first_step_output: !expr $.steps.long_wait.outputs ` func TestEarlyContextCancellation(t *testing.T) { - // For this test, a workflow runs two steps, where each step runs a wait step for 5s - // The second wait step waits for the first to succeed after which it runs - // Due to the wait for condition, the steps will execute serially - // The total execution time for this test function should be greater than 10seconds - // as each step runs for 5s and are run serially - // The test double deployer will be used for this test, as we - // need a deployer to test the plugin step provider. + // Test to ensure the workflow aborts when instructed to. + // The wait step should exit gracefully when the workflow is cancelled. logConfig := log.Config{ - Level: log.LevelInfo, + Level: log.LevelDebug, Destination: log.DestinationStdout, } logger := log.New( @@ -843,10 +845,10 @@ func TestEarlyContextCancellation(t *testing.T) { stepRegistry, builtinfunctions.GetFunctions(), )) - wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(fiveSecWaitWorkflowDefinition))) + wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(fourSecWaitWorkflowDefinition))) preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{})) - // Cancel the context after 3 ms to simulate cancellation with ctrl-c. - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*3) + // Cancel the context after a timeout to simulate cancellation with ctrl-c. + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) startTime := time.Now() // Right before execute to not include pre-processing time. //nolint:dogsled _, _, _ = preparedWorkflow.Execute(ctx, map[string]any{}) @@ -855,7 +857,7 @@ func TestEarlyContextCancellation(t *testing.T) { duration := time.Since(startTime) t.Logf("Test execution time: %s", duration) if duration >= 1000*time.Millisecond { - t.Fatalf("Test execution time is greater than 1000 milliseconds; Is the workflow properly cancelling?") + t.Fatalf("Test execution time is greater than 100 milliseconds; Is the workflow properly cancelling?") } } @@ -1184,14 +1186,14 @@ steps: deployer_name: "test-impl" # stop_if doesn't create any dependency, so we must keep the step in the deployment # stage long enough for the cancellation to occur at the intended stage. - deploy_time: 50 # ms + deploy_time: 25 # ms input: # The actual wait time should not matter for this test because the intention is to # to cancel it before it is run. wait_time_ms: 0 stop_if: $.input.step_cancelled outputs: - success: + did-not-cancel: simple_wait_output: !expr $.steps.simple_wait.outputs.success ` @@ -1207,11 +1209,14 @@ func TestInputCancelledStepWorkflow(t *testing.T) { preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( getTestImplPreparedWorkflow(t, inputCancelledStepWorkflow), ) - _, _, err := preparedWorkflow.Execute(context.Background(), map[string]any{ + outputID, _, err := preparedWorkflow.Execute(context.Background(), map[string]any{ "step_cancelled": true, }) - assert.Error(t, err) - assert.Contains(t, err.Error(), "cannot construct any output") + assert.Equals(t, outputID, "") + var typedError *workflow.ErrNoMorePossibleOutputs + if !errors.As(err, &typedError) { + t.Fatalf("incorrect error type returned: %T (%s)", err, err) + } } var inputDisabledStepWorkflow = ` @@ -1235,7 +1240,7 @@ steps: wait_time_ms: 20 enabled: !expr $.input.step_enabled outputs: - success: + workflow-success: simple_wait_output: !expr $.steps.simple_wait.outputs.success ` @@ -1253,13 +1258,16 @@ func TestInputDisabledStepWorkflow(t *testing.T) { "step_enabled": true, }) assert.NoError(t, err) - assert.Equals(t, outputID, "success") + assert.Equals(t, outputID, "workflow-success") // The workflow should fail with it disabled because the output cannot be resolved. _, _, err = preparedWorkflow.Execute(context.Background(), map[string]any{ "step_enabled": false, }) assert.Error(t, err) - assert.Contains(t, err.Error(), "cannot construct any output") + var typedError *workflow.ErrNoMorePossibleOutputs + if !errors.As(err, &typedError) { + t.Fatalf("incorrect error type returned: %T (%s)", err, err) + } } var dynamicDisabledStepWorkflow = ` @@ -1288,7 +1296,7 @@ steps: step: wait input: wait_time_ms: 0 - enabled: !expr $.steps.initial_wait.outputs.success.message == "Plugin slept for 20 ms." + enabled: !expr $.steps.initial_wait.outputs.success.message == "Plugin slept for 5 ms." outputs: success: initial_wait_output: !expr $.steps.initial_wait.outputs.success @@ -1306,16 +1314,16 @@ func TestDelayedDisabledStepWorkflow(t *testing.T) { preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( getTestImplPreparedWorkflow(t, dynamicDisabledStepWorkflow), ) - // The second step expects a 20ms sleep/wait. - // Pass with a 20ms input. + // The second step expects a 5ms sleep/wait. + // Pass with a 5ms input. outputID, _, err := preparedWorkflow.Execute(context.Background(), map[string]any{ - "sleep_time": 20, + "sleep_time": 5, }) assert.NoError(t, err) assert.Equals(t, outputID, "success") - // Fail with a non-20ms input. + // Fail with a non-5ms input. outputID, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{ - "sleep_time": 19, + "sleep_time": 4, }) assert.NoError(t, err) assert.Equals(t, outputID, "disabled") @@ -1368,6 +1376,637 @@ func TestExpressionWithWhitespace(t *testing.T) { }) } +var multiDependencyFailureWorkflowWithDisabling = ` +version: v0.2.0 +input: + root: WorkflowInput + objects: + WorkflowInput: + id: WorkflowInput + properties: + fail_purposefully: + type: + type_id: bool +steps: + disabled_step: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: 0 + enabled: !expr '!$.input.fail_purposefully' + long_wait: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + closure_wait_timeout: 0 + input: + wait_time_ms: 5000 +outputs: + workflow-success: + simple_wait_output: !expr $.steps.disabled_step.outputs.success + long_wait_output: !expr $.steps.long_wait.outputs.success +` + +func TestMultiDependencyWorkflowFailureWithDisabling(t *testing.T) { + // Tests failure when one dependency is disabled immediately, and the other one completes later. + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getTestImplPreparedWorkflow(t, multiDependencyFailureWorkflowWithDisabling), + ) + startTime := time.Now() // Right before execution to not include pre-processing time. + _, _, err := preparedWorkflow.Execute(context.Background(), map[string]any{ + "fail_purposefully": true, + }) + duration := time.Since(startTime) + assert.Error(t, err) + t.Logf("MultiDependencyFailureWithDisabling workflow failed purposefully in %d ms", duration.Milliseconds()) + var typedError *workflow.ErrNoMorePossibleOutputs + if !errors.As(err, &typedError) { + t.Fatalf("incorrect error type returned: %T (%s)", err, err) + } + assert.LessThan(t, duration.Milliseconds(), 500) // It will take 5 seconds if it fails to fail early. +} + +var multiDependencyFailureWorkflowWithCancellation = ` +version: v0.2.0 +input: + root: WorkflowInput + objects: + WorkflowInput: + id: WorkflowInput + properties: + fail_purposefully: + type: + type_id: bool +steps: + canceled_step: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: 0 + stop_if: !expr '$.input.fail_purposefully' + deploy: + deployer_name: "test-impl" + deploy_time: 10 # (ms) delay to make the cancellation more reliable. + long_wait: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + closure_wait_timeout: 0 + input: + wait_time_ms: 5000 +outputs: + workflow-success: + cancelled_step_output: !expr $.steps.canceled_step.outputs.success + long_wait_output: !expr $.steps.long_wait.outputs.success +` + +func TestMultiDependencyWorkflowFailureWithCancellation(t *testing.T) { + // Tests failure when one dependency is cancelled immediately, and the other one completes later. + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getTestImplPreparedWorkflow(t, multiDependencyFailureWorkflowWithCancellation), + ) + startTime := time.Now() // Right before execution to not include pre-processing time. + _, _, err := preparedWorkflow.Execute(context.Background(), map[string]any{ + "fail_purposefully": true, + }) + duration := time.Since(startTime) + assert.Error(t, err) + t.Logf("MultiDependencyFailureWithCancellation workflow failed purposefully in %d ms", duration.Milliseconds()) + var typedError *workflow.ErrNoMorePossibleOutputs + if !errors.As(err, &typedError) { + t.Fatalf("incorrect error type returned: %T (%s)", err, err) + } + assert.LessThan(t, duration.Seconds(), 4) // It will take 5 seconds if it fails to fail early. +} + +var multiDependencyFailureWorkflowWithErrorOut = ` +version: v0.2.0 +input: + root: WorkflowInput + objects: + WorkflowInput: + id: WorkflowInput + properties: + fail_purposefully: + type: + type_id: bool +steps: + failed_step: + plugin: + src: "n/a" + deployment_type: "builtin" + step: hello + input: + fail: !expr $.input.fail_purposefully + long_wait: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + closure_wait_timeout: 0 + input: + wait_time_ms: 7000 +outputs: + workflow-success: + failed_output: !expr $.steps.failed_step.outputs.success + long_wait_output: !expr $.steps.long_wait.outputs.success +` + +func TestMultiDependencyWorkflowFailureWithErrorOut(t *testing.T) { + // Tests failure when one dependency fails immediately due to the wrong output, and the + // other one completes later. + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getTestImplPreparedWorkflow(t, multiDependencyFailureWorkflowWithErrorOut), + ) + startTime := time.Now() // Right before execution to not include pre-processing time. + _, _, err := preparedWorkflow.Execute(context.Background(), map[string]any{ + "fail_purposefully": true, + }) + duration := time.Since(startTime) + assert.Error(t, err) + t.Logf("MultiDependencyFailureWithErrorOut workflow failed purposefully in %d ms", duration.Milliseconds()) + var typedError *workflow.ErrNoMorePossibleOutputs + if !errors.As(err, &typedError) { + t.Fatalf("incorrect error type returned: %T (%s)", err, err) + } + // If it takes 5 seconds, then there was a deadlock in the client. + // If it takes 6 seconds, then it waited for the second step. + assert.LessThan(t, duration.Milliseconds(), 5500) +} + +var multiDependencyFailureWorkflowWithDeployFailure = ` +version: v0.2.0 +input: + root: WorkflowInput + objects: + WorkflowInput: + id: WorkflowInput + properties: + fail_purposefully: + type: + type_id: bool +steps: + failed_step: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: 0 + deploy: + deployer_name: "test-impl" + deploy_succeed: !expr '!$.input.fail_purposefully' + long_wait: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + closure_wait_timeout: 0 + input: + wait_time_ms: 7000 +outputs: + workflow-success: + failed_step_output: !expr $.steps.failed_step.outputs + simple_wait_output: !expr $.steps.long_wait.outputs +` + +func TestMultiDependencyWorkflowFailureWithDeployFailure(t *testing.T) { + // Tests failure when one dependency fails (due to failed deployment) immediately, + // and the other one fails later. + // In this specific test the output depends on the `steps.failed_step.outputs` node + // instead of the `steps.failed_step.outputs.success` node because they are handled + // differently in the engine. + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getTestImplPreparedWorkflow(t, multiDependencyFailureWorkflowWithDeployFailure), + ) + startTime := time.Now() // Right before execution to not include pre-processing time. + _, _, err := preparedWorkflow.Execute(context.Background(), map[string]any{ + "fail_purposefully": true, + }) + duration := time.Since(startTime) + assert.Error(t, err) + t.Logf("MultiDependency workflow failed purposefully in %d ms", duration.Milliseconds()) + var typedError *workflow.ErrNoMorePossibleOutputs + if !errors.As(err, &typedError) { + t.Fatalf("incorrect error type returned: %T (%s)", err, err) + } + // If it takes 5 seconds, then there was a deadlock in the client. + // If it takes 6 seconds, then it waited for the second step. + assert.LessThan(t, duration.Milliseconds(), 5500) +} + +var multiDependencyFailureWorkflowWithDoubleFailure = ` +version: v0.2.0 +input: + root: WorkflowInput + objects: + WorkflowInput: + id: WorkflowInput + properties: {} +steps: + failed_step_A: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: 0 + deploy: # This fails first. + deployer_name: "test-impl" + deploy_time: 0 + deploy_succeed: !expr 'false' + quick_step: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: 1 # Second, this succeeds, which cancels the second failing step. + failed_step_B: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + wait_for: !expr $.steps.failed_step_A.outputs # Makes it unresolvable + stop_if: !expr $.steps.quick_step.outputs # Hopefully triggers the second resolution. + input: + wait_time_ms: 0 + deploy: + deployer_name: "test-impl" + deploy_succeed: !expr 'true' + long_wait: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + closure_wait_timeout: 0 + input: + wait_time_ms: 10 +outputs: + workflow-success: + failed_step_output: !expr $.steps.failed_step_A.outputs + wait-only: # For the error to be exposed, we need an alternative output that persists beyond the error. + wait_output: !expr $.steps.long_wait.outputs +` + +func TestMultiDependencyWorkflowFailureDoubleFailure(t *testing.T) { + // Creates a scenario where step B's starting (due to wait-for) depends on step A's outputs, + // making A's outputs become unresolvable, while at the same time the step that needs that info (B) crashes. + // Transitioning B to crashed resolves starting, so that is in conflict with the unresolvable + // state propagated from step A. + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getTestImplPreparedWorkflow(t, multiDependencyFailureWorkflowWithDoubleFailure), + ) + + startTime := time.Now() // Right before execution to not include pre-processing time. + outputID, _, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) + duration := time.Since(startTime) + assert.NoError(t, err) + assert.Equals(t, outputID, "wait-only") + t.Logf("MultiDependency DoubleFailure finished in %d ms", duration.Milliseconds()) +} + +var multiDependencyFailureWorkflowContextCancelled = ` +version: v0.2.0 +input: + root: WorkflowInput + objects: + WorkflowInput: + id: WorkflowInput + properties: {} +steps: + wait_step: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + closure_wait_timeout: 0 + input: + wait_time_ms: 500 + cancelled_step: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + stop_if: !expr $.steps.wait_step.outputs # The context will get cancelled before this triggers. + input: + wait_time_ms: 0 + deploy: + deployer_name: "test-impl" + deploy_succeed: !expr 'true' + deploy_time: 5 # ms +outputs: + finished: + cancelled_step_output: !expr $.steps.cancelled_step.outputs + wait-only: # The workflow needs to keep running after the cancelled step exits. + wait_output: !expr $.steps.wait_step.outputs +` + +func TestMultiDependencyWorkflowContextCanceled(t *testing.T) { + // A scenario where the step's inputs are resolvable, but the context is cancelled, resulting + // in a possible conflict with the cancelled step stage DAG node. + // To do this, create a multi-dependency setup, finish the step, then cancel the workflow + // before the workflow finishes. + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getTestImplPreparedWorkflow(t, multiDependencyFailureWorkflowContextCancelled), + ) + + ctx, timeout := context.WithTimeout(context.Background(), time.Millisecond*30) + startTime := time.Now() // Right before execution to not include pre-processing time. + _, _, err := preparedWorkflow.Execute(ctx, map[string]any{}) + duration := time.Since(startTime) + assert.NoError(t, err) + timeout() + t.Logf("MultiDependency ContextCanceled finished in %d ms", duration.Milliseconds()) +} + +var multiDependencyDependOnClosedStepPostDeployment = ` +version: v0.2.0 +input: + root: WorkflowInput + objects: + WorkflowInput: + id: WorkflowInput + properties: {} +steps: + wait_step: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: 1 + long_wait: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: 5000 + cancelled_step: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + stop_if: !expr $.steps.wait_step.outputs + wait_for: !expr $.steps.long_wait.outputs + input: + wait_time_ms: 0 + # The deploy section is blank, so it will target a post-deployment cancellation. +outputs: + finished: + cancelled_step_output: !expr $.steps.cancelled_step.outputs + closed: # The workflow needs to keep running after the cancelled step exits. + closed_output: !expr $.steps.cancelled_step.closed.result + wait_finished: + wait_output: !expr $.steps.long_wait.outputs +` + +func TestMultiDependencyDependOnClosedStepPostDeployment(t *testing.T) { + // This has the output depend on the closed output of a step. + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getTestImplPreparedWorkflow(t, multiDependencyDependOnClosedStepPostDeployment), + ) + + startTime := time.Now() // Right before execution to not include pre-processing time. + outputID, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) + duration := time.Since(startTime) + t.Logf("MultiDependency DependOnClosedStepPostDeployment finished in %d ms", duration.Milliseconds()) + assert.NoError(t, err) + assert.Equals(t, outputID, "closed") + assert.Equals(t, outputData.(map[any]any), map[any]any{ + "closed_output": map[any]any{ + "cancelled": true, + "close_requested": false, + }, + }) +} + +var multiDependencyDependOnClosedDeployment = ` +version: v0.2.0 +input: + root: WorkflowInput + objects: + WorkflowInput: + id: WorkflowInput + properties: {} +steps: + wait_step: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: 1 + cancelled_step: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + stop_if: !expr $.steps.wait_step.outputs + input: + wait_time_ms: 500 + # The deploy section has a delay black, so it will target a mid-deployment cancellation. + closure_wait_timeout: 0 + deploy: + deployer_name: "test-impl" + deploy_time: 10 # ms. If this is too low, there will be a race that results in run fail instead of closure. +outputs: + finished: + cancelled_step_output: !expr $.steps.cancelled_step.outputs + closed: # The workflow needs to keep running after the cancelled step exits. + closed_output: !expr $.steps.cancelled_step.closed.result +` + +func TestMultiDependencyDependOnClosedStepDeployment(t *testing.T) { + // This has the output depend on the closed output of a step. + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getTestImplPreparedWorkflow(t, multiDependencyDependOnClosedDeployment), + ) + + startTime := time.Now() // Right before execution to not include pre-processing time. + outputID, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) + duration := time.Since(startTime) + t.Logf("MultiDependency DependOnClosedStep finished in %d ms", duration.Milliseconds()) + assert.NoError(t, err) + assert.Equals(t, outputID, "closed") + assert.Equals(t, outputData.(map[any]any), map[any]any{ + "closed_output": map[any]any{ + "cancelled": true, + "close_requested": false, + }, + }) +} + +var multiDependencyDependOnContextDoneDeployment = ` +version: v0.2.0 +input: + root: WorkflowInput + objects: + WorkflowInput: + id: WorkflowInput + properties: {} +steps: + wait_step: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + closure_wait_timeout: 0 + input: + wait_time_ms: 1000 + not_enabled_step: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + enabled: !expr $.steps.wait_step.outputs.success.message == "Plugin slept for 100 ms." + input: + wait_time_ms: 0 +outputs: + finished: + cancelled_step_output: !expr $.steps.not_enabled_step.outputs + closed: # The workflow needs to keep running after the cancelled step exits. + closed_output: !expr $.steps.not_enabled_step.closed.result +` + +func TestMultiDependencyDependOnContextDoneDeployment(t *testing.T) { + // A scenario where you close the context but still expect an output by depending on the closed output. + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getTestImplPreparedWorkflow(t, multiDependencyDependOnContextDoneDeployment), + ) + + ctx, timeout := context.WithTimeout(context.Background(), time.Millisecond*10) + startTime := time.Now() // Right before execution to not include pre-processing time. + outputID, outputData, err := preparedWorkflow.Execute(ctx, map[string]any{}) + duration := time.Since(startTime) + assert.NoError(t, err) + timeout() + assert.Equals(t, outputID, "closed") + assert.Equals(t, outputData.(map[any]any), map[any]any{ + "closed_output": map[any]any{ + "cancelled": false, + "close_requested": true, + }, + }) + t.Logf("MultiDependency DependOnClosedStep finished in %d ms", duration.Milliseconds()) +} + +var multiDependencyForEachParent = ` +version: v0.2.0 +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: {} +steps: + subworkflow: + kind: foreach + items: + - {} + workflow: subworkflow.yaml + long_wait: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: 5000 +outputs: + success: + long_wait_output: !expr $.steps.long_wait.outputs + subworkflow_output: !expr $.steps.subworkflow.outputs.success +` + +var multiDependencyForEachSubwf = ` +version: v0.2.0 +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: {} +steps: + failed_step: + plugin: + src: "n/a" + deployment_type: "builtin" + step: hello + input: + fail: !expr true +outputs: + success: + b: !expr $.steps.failed_step.outputs.success +` + +func TestMultiDependencyForeach(t *testing.T) { + // This test runs a workflow with a wait and a subworkfow. + // This tests to ensure that the parent workflow immediately detects + // and acts on the missing dependency caused by the error output + // coming from the subworkflow. + + logConfig := log.Config{ + Level: log.LevelInfo, + Destination: log.DestinationStdout, + } + logger := log.New( + logConfig, + ) + cfg := &config.Config{ + Log: logConfig, + } + factories := workflowFactory{ + config: cfg, + } + deployerRegistry := deployerregistry.New( + deployer.Any(testimpl.NewFactory()), + ) + + pluginProvider := assert.NoErrorR[step.Provider](t)( + plugin.New(logger, deployerRegistry, map[string]interface{}{ + "builtin": map[string]any{ + "deployer_name": "test-impl", + "deploy_time": "0", + }, + }), + ) + stepRegistry, err := stepregistry.New( + pluginProvider, + lang.Must2(foreach.New(logger, factories.createYAMLParser, factories.createWorkflow)), + ) + assert.NoError(t, err) + + factories.stepRegistry = stepRegistry + executor := lang.Must2(workflow.NewExecutor( + logger, + cfg, + stepRegistry, + builtinfunctions.GetFunctions(), + )) + wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(multiDependencyForEachParent))) + preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{ + "subworkflow.yaml": []byte(multiDependencyForEachSubwf), + })) + startTime := time.Now() // Right before execute to not include pre-processing time. + _, _, err = preparedWorkflow.Execute(context.Background(), map[string]any{}) + duration := time.Since(startTime) + assert.Error(t, err) + + t.Logf("MultiDependency workflow failed purposefully in %d ms", duration.Milliseconds()) + var typedError *workflow.ErrNoMorePossibleOutputs + if !errors.As(err, &typedError) { + t.Fatalf("incorrect error type returned: %T (%s)", err, err) + } + assert.LessThan(t, duration.Milliseconds(), 400) +} + func createTestExecutableWorkflow(t *testing.T, workflowStr string, workflowCtx map[string][]byte) (workflow.ExecutableWorkflow, error) { logConfig := log.Config{ Level: log.LevelDebug, @@ -1384,6 +2023,7 @@ func createTestExecutableWorkflow(t *testing.T, workflowStr string, workflowCtx } const printNamespaceResponseOutput = `OBJECT NAMESPACE +ClosedInfo $.steps.long_wait.closed.outputs.result Crashed $.steps.long_wait.crashed.outputs.error DeployError $.steps.long_wait.deploy_failed.outputs.error DisabledMessageOutput $.steps.long_wait.disabled.outputs.output @@ -1397,7 +2037,7 @@ StartedOutput $.steps.long_wait.starting.outputs.started func TestPrintObjectNamespaceTable(t *testing.T) { preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( - getTestImplPreparedWorkflow(t, fiveSecWaitWorkflowDefinition), + getTestImplPreparedWorkflow(t, fourSecWaitWorkflowDefinition), ) buf := bytes.NewBuffer(nil) workflow.PrintObjectNamespaceTable(buf, preparedWorkflow.Namespaces(), nil)