From b292124e088c2012c87211a05ddc35889ce5d7ce Mon Sep 17 00:00:00 2001 From: Jared O'Connell <46976761+jaredoconnell@users.noreply.github.com> Date: Thu, 26 Oct 2023 13:54:43 -0400 Subject: [PATCH] ATP v3, Validate Compatibility, Improved stop_if, better logs, retry fix (#114) * Added workflow pre-validation Also improved debug messages * Updated go mod/sum * Progress towards debugging deadlock detection Mostly this is test cases, and some mild refactoring that should have a similar result * Use image that supports ARM64 * Ensure that run finishes before closing the step This ensures that the goroutines are done when the engine exits. This fixes a test case, and will be important when retrying is added to the deadlock check * Added deadlock check retries This is a workaround for the situation where all of the steps are processing at the same time, all waiting for input * Update test cases to take less time Also improved error logic * Update dependencies Required for prior commits to work * Update go SDK to dev branch This adds the done message to the SDK * Updated test deployer to dev branch * Updated deployers, and added python deployer to run-plugin * Label loggers * Update to use ATP v3 This means new error handling, so multiple errors can be reported * Update SDK in go mod, and add force stop to step interface Force stop is good for when you do not need to wait for the ATP client, or something equivalant, to finish. Waiting for the ATP to finish means waiting for the steps to finish entirely. * Cleanup * Fix tests * Update SDK and test deployer * Reduce redundancy in Plugin Provider * Refactoring to fix linting errors I reduced the amount of code in one function, and reduced redundancy of unlocking * Fix linting errors These ones were only shown on CI, not locally * bump internal dependencies * update all internal dependencies * Remove fragile test * bump python deployer to 0.3.0 --------- Signed-off-by: Dustin Black Co-authored-by: Dustin Black --- cmd/arcaflow/main.go | 2 +- cmd/run-plugin/run.go | 64 ++- engine_test.go | 4 +- go.mod | 30 +- go.sum | 70 ++-- internal/step/dummy/provider.go | 18 +- internal/step/dummy/provider_test.go | 6 +- internal/step/foreach/provider.go | 31 +- internal/step/plugin/provider.go | 583 +++++++++++++++----------- internal/step/plugin/provider_test.go | 81 +++- internal/step/provider.go | 10 +- workflow/any.go | 9 + workflow/executor.go | 171 +++++++- workflow/executor_test.go | 193 ++++++++- workflow/model.go | 23 +- workflow/workflow.go | 236 ++++++++--- workflow/workflow_test.go | 366 +++++++++------- 17 files changed, 1330 insertions(+), 567 deletions(-) diff --git a/cmd/arcaflow/main.go b/cmd/arcaflow/main.go index 83cea9af..967ebf90 100644 --- a/cmd/arcaflow/main.go +++ b/cmd/arcaflow/main.go @@ -133,7 +133,7 @@ Options: } cfg.Log.Stdout = os.Stderr - logger := log.New(cfg.Log) + logger := log.New(cfg.Log).WithLabel("source", "main") dirContext, err := loadContext(dir) if err != nil { diff --git a/cmd/run-plugin/run.go b/cmd/run-plugin/run.go index be241dee..f3a19138 100644 --- a/cmd/run-plugin/run.go +++ b/cmd/run-plugin/run.go @@ -6,7 +6,10 @@ import ( "flag" "fmt" "go.flow.arcalot.io/deployer" + "go.flow.arcalot.io/pluginsdk/plugin" + "go.flow.arcalot.io/pluginsdk/schema" podman "go.flow.arcalot.io/podmandeployer" + pythondeployer "go.flow.arcalot.io/pythondeployer" "os" "os/signal" @@ -21,14 +24,19 @@ func main() { var image string var file string var stepID string + var pythonPath string var d deployer.AnyConnectorFactory var defaultConfig any var deployerID = "docker" + var runID = "run" + var workingDir string flag.StringVar(&image, "image", image, "Docker image to run") flag.StringVar(&file, "file", file, "Input file") flag.StringVar(&stepID, "step", stepID, "Step name") flag.StringVar(&deployerID, "deployer", stepID, "The name of the deployer") + flag.StringVar(&pythonPath, "pythonpath", "", "Path to the Python environment") + flag.StringVar(&workingDir, "workingdir", "~/", "Path to store cloned repositories") flag.Parse() switch deployerID { @@ -60,35 +68,56 @@ func main() { if err != nil { panic(err) } + case "python": + pythonFactory := pythondeployer.NewFactory() + d = deployer.Any(pythonFactory) + configSchema := pythonFactory.ConfigurationSchema() + var err error + configInput := map[string]any{} + configInput["pythonPath"] = pythonPath + configInput["workdir"] = workingDir + defaultConfig, err = configSchema.UnserializeType(configInput) + if err != nil { + panic(err) + } default: - panic("No deployer or invalid deployer selected. Options: docker, podman, testimpl. Select with -deployer") + panic("No deployer or invalid deployer selected. Options: docker, podman, testimpl, python. Select with -deployer") } - - connector, err := d.Create(defaultConfig, log.New(log.Config{ + logger := log.New(log.Config{ Level: log.LevelDebug, Destination: log.DestinationStdout, - })) + }) + connector, err := d.Create(defaultConfig, logger) if err != nil { panic(err) } ctrlC := make(chan os.Signal, 1) signal.Notify(ctrlC, os.Interrupt) + // Set up the signal channel to send cancel signal on ctrl-c + toStepSignals := make(chan schema.Input, 3) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { select { case <-ctrlC: - fmt.Println("Received CTRL-C. Cancelling the context to cancel the step...") + logger.Infof("Received CTRL-C. Sending cancel signal...") + toStepSignals <- schema.Input{ + RunID: runID, + ID: plugin.CancellationSignalSchema.ID(), + InputData: make(map[string]any), + } + logger.Infof("Signal request sent to ATP client.") cancel() case <-ctx.Done(): // Done here. } }() - fmt.Println("Deploying") + logger.Infof("Deploying") plugin, err := connector.Deploy(ctx, image) if err != nil { + logger.Errorf("Error while deploying: %s", err) panic(err) } defer func() { @@ -97,8 +126,8 @@ func main() { } }() - atpClient := atp.NewClient(plugin) - fmt.Println("Getting schema") + atpClient := atp.NewClientWithLogger(plugin, logger) + logger.Infof("Getting schema") pluginSchema, err := atpClient.ReadSchema() if err != nil { panic(err) @@ -120,15 +149,22 @@ func main() { panic(err) } fmt.Printf("Running step %s\n", stepID) - outputID, outputData, err := atpClient.Execute(stepID, input) + result := atpClient.Execute( + schema.Input{RunID: runID, ID: stepID, InputData: input}, + toStepSignals, + nil, + ) + if err := atpClient.Close(); err != nil { + fmt.Printf("Error closing ATP client: %s", err) + } output := map[string]any{ - "outputID": outputID, - "outputData": outputData, - "err": err, + "outputID": result.OutputID, + "outputData": result.OutputData, + "err": result.Error, } - result, err := yaml.Marshal(output) + resultStr, err := yaml.Marshal(output) if err != nil { panic(err) } - fmt.Printf("%s", result) + fmt.Printf("%s", resultStr) } diff --git a/engine_test.go b/engine_test.go index bd5a45bf..3ff7cc5b 100644 --- a/engine_test.go +++ b/engine_test.go @@ -143,7 +143,7 @@ input: type_id: string steps: example: - plugin: quay.io/arcalot/arcaflow-plugin-template-python:0.1.0 + plugin: quay.io/arcalot/arcaflow-plugin-template-python:0.2.1 input: name: !expr $.input.name output: @@ -174,7 +174,7 @@ input: type_id: string steps: example: - plugin: quay.io/arcalot/arcaflow-plugin-template-python:0.1.0 + plugin: quay.io/arcalot/arcaflow-plugin-template-python:0.2.1 input: name: !expr $.input.name outputs: diff --git a/go.mod b/go.mod index 97cb47c1..de8d6e7d 100644 --- a/go.mod +++ b/go.mod @@ -3,18 +3,18 @@ module go.flow.arcalot.io/engine go 1.18 require ( - go.arcalot.io/assert v1.3.0 + go.arcalot.io/assert v1.6.0 go.arcalot.io/dgraph v1.1.0 go.arcalot.io/lang v1.0.0 go.arcalot.io/log/v2 v2.0.0 - go.flow.arcalot.io/deployer v0.2.0 - go.flow.arcalot.io/dockerdeployer v0.3.0 - go.flow.arcalot.io/expressions v0.2.0 - go.flow.arcalot.io/kubernetesdeployer v0.5.1 - go.flow.arcalot.io/pluginsdk v0.3.0-beta.1 - go.flow.arcalot.io/podmandeployer v0.3.1 - go.flow.arcalot.io/pythondeployer v0.1.3 - go.flow.arcalot.io/testdeployer v0.2.0 + go.flow.arcalot.io/deployer v0.3.0 + go.flow.arcalot.io/dockerdeployer v0.4.0 + go.flow.arcalot.io/expressions v0.2.1 + go.flow.arcalot.io/kubernetesdeployer v0.7.0 + go.flow.arcalot.io/pluginsdk v0.5.0 + go.flow.arcalot.io/podmandeployer v0.5.0 + go.flow.arcalot.io/pythondeployer v0.3.0 + go.flow.arcalot.io/testdeployer v0.3.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -22,12 +22,13 @@ require ( github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/docker/distribution v2.8.2+incompatible // indirect - github.com/docker/docker v24.0.5+incompatible // indirect + github.com/distribution/reference v0.5.0 // indirect + github.com/docker/distribution v2.8.3+incompatible // indirect + github.com/docker/docker v24.0.6+incompatible // indirect github.com/docker/go-connections v0.4.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/emicklei/go-restful/v3 v3.10.0 // indirect - github.com/fxamacker/cbor/v2 v2.4.0 // indirect + github.com/fxamacker/cbor/v2 v2.5.0 // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect @@ -47,15 +48,16 @@ require ( github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.2 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/rogpeppe/go-internal v1.11.0 // indirect github.com/x448/float16 v0.8.4 // indirect - go.flow.arcalot.io/testplugin v0.1.0 // indirect + go.flow.arcalot.io/testplugin v0.2.1 // indirect golang.org/x/mod v0.13.0 // indirect golang.org/x/net v0.15.0 // indirect golang.org/x/oauth2 v0.2.0 // indirect golang.org/x/sys v0.12.0 // indirect golang.org/x/term v0.12.0 // indirect golang.org/x/text v0.13.0 // indirect - golang.org/x/time v0.2.0 // indirect + golang.org/x/time v0.3.0 // indirect golang.org/x/tools v0.13.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/protobuf v1.28.1 // indirect diff --git a/go.sum b/go.sum index 50b30625..fc01050e 100644 --- a/go.sum +++ b/go.sum @@ -19,10 +19,12 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/docker/distribution v2.8.2+incompatible h1:T3de5rq0dB1j30rp0sA2rER+m322EBzniBPB6ZIzuh8= -github.com/docker/distribution v2.8.2+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= -github.com/docker/docker v24.0.5+incompatible h1:WmgcE4fxyI6EEXxBRxsHnZXrO1pQ3smi0k/jho4HLeY= -github.com/docker/docker v24.0.5+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0= +github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= +github.com/docker/distribution v2.8.3+incompatible h1:AtKxIZ36LoNK51+Z6RpzLpddBirtxJnzDrHLEKxTAYk= +github.com/docker/distribution v2.8.3+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= +github.com/docker/docker v24.0.6+incompatible h1:hceabKCtUgDqPu+qm0NgsaXf28Ljf4/pWFL7xjWWDgE= +github.com/docker/docker v24.0.6+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ= github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= @@ -39,8 +41,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84= github.com/flowstack/go-jsonschema v0.1.1/go.mod h1:yL7fNggx1o8rm9RlgXv7hTBWxdBM0rVwpMwimd3F3N0= -github.com/fxamacker/cbor/v2 v2.4.0 h1:ri0ArlOR+5XunOP8CRUowT0pSJOwhW098ZCUyskZD88= -github.com/fxamacker/cbor/v2 v2.4.0/go.mod h1:TA1xS00nchWmaBnEIxPSE5oHLuJBAVvqrtAnWBwBCVo= +github.com/fxamacker/cbor/v2 v2.5.0 h1:oHsG0V/Q6E/wqTS2O1Cozzsy69nqCiguo5Q1a1ADivE= +github.com/fxamacker/cbor/v2 v2.5.0/go.mod h1:TA1xS00nchWmaBnEIxPSE5oHLuJBAVvqrtAnWBwBCVo= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= @@ -90,7 +92,7 @@ github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28= -github.com/joho/godotenv v1.4.0 h1:3l4+N6zfMWnkbPEXKng2o2/MR5mSwTrBih4ZEkkz1lg= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= @@ -98,8 +100,8 @@ github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHm github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -108,7 +110,7 @@ github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0 github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/moby/spdystream v0.2.0 h1:cjW1zVyyoiM0T7b6UoySUFqzXMoqRckQtXwGPiBhOM8= github.com/moby/spdystream v0.2.0/go.mod h1:f7i0iNDQJ059oMTcWxx8MA/zKFIuD/lY+0GqbN2Wy8c= -github.com/moby/term v0.0.0-20221105221325-4eb28fa6025c h1:RC8WMpjonrBfyAh6VN/POIPtYD5tRAq0qMqCRjQNK+g= +github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -129,6 +131,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= +github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= +github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= @@ -139,8 +143,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= @@ -148,32 +152,32 @@ github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1: github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.arcalot.io/assert v1.3.0 h1:+uQex4s9gezATpTyFxUY5dlAcrwI1Me5fSmdcydGHho= -go.arcalot.io/assert v1.3.0/go.mod h1:Xy3ScX0p9IMY89gdsgexOKxnmDr0nGHG9dV7p8Uxg7w= +go.arcalot.io/assert v1.6.0 h1:iKA8SZZ1MRblMX5QAwwY5RbpR+VNyp//4IU7vo08Xu0= +go.arcalot.io/assert v1.6.0/go.mod h1:Xy3ScX0p9IMY89gdsgexOKxnmDr0nGHG9dV7p8Uxg7w= go.arcalot.io/dgraph v1.1.0 h1:c0LR7+xdUy7Ki6e4nR9rBvK0Upr4Nu49fu+poP/9WMg= go.arcalot.io/dgraph v1.1.0/go.mod h1:FuNv92OgHsJYepD6Unwn+S/4DioBnv06JxQ2BtQct7E= go.arcalot.io/lang v1.0.0 h1:mgDaieT4wWdZTnR4V7+/pgYRmzfU7VZZgIzHccuxAbY= go.arcalot.io/lang v1.0.0/go.mod h1:ALqfYEhAzC2WoGLaycmJoNJd5NmkR7V1PSKp/c5D278= go.arcalot.io/log/v2 v2.0.0 h1:mbmsWDVBXZNWrDzUh5JLzeGCQ59kTuMFs+pyfJGc1hk= go.arcalot.io/log/v2 v2.0.0/go.mod h1:1V8jnFIIGwh2CtcGkHNOmy1nCo7LbazQNkUcnKYNMn4= -go.flow.arcalot.io/deployer v0.2.0 h1:CpkCYlB8NfpmELIEPdw3/al8XknCSfD/L2vie2lJBJo= -go.flow.arcalot.io/deployer v0.2.0/go.mod h1:xVSB+svHVPmX6yTZIU0K4U/pDbs+rezsWa69vYA+E6k= -go.flow.arcalot.io/dockerdeployer v0.3.0 h1:9F9ZyaiB1kwKLRMXAktkyD3eDVvWoMYSHCojTrPDusk= -go.flow.arcalot.io/dockerdeployer v0.3.0/go.mod h1:9cMMHmkHxzHgLOLva6RmGb5d8jrbRkar/PcjeiU0QvA= -go.flow.arcalot.io/expressions v0.2.0 h1:2k8InnpLqVmv5SDvJ1xRz1ubqF+zKN44U08D50TkqJs= -go.flow.arcalot.io/expressions v0.2.0/go.mod h1:m4p6oCwjjxRjPCAYGeZDNP9DGfoDODHo1z4pBoO+Lpc= -go.flow.arcalot.io/kubernetesdeployer v0.5.1 h1:YpPFSouEWjxN8sxdr4BuZMl3IM9jCGEXBAnyoWJnHls= -go.flow.arcalot.io/kubernetesdeployer v0.5.1/go.mod h1:mdhwBGQ0wlquo+wUR0VNSAyj9NYxDdTwM7vYV1XmRnw= -go.flow.arcalot.io/pluginsdk v0.3.0-beta.1 h1:RrC5SKDkhwG/enE/FajAxRF1izET61/LO4lhaI9q094= -go.flow.arcalot.io/pluginsdk v0.3.0-beta.1/go.mod h1:7cEk8LSxpZakyfrmKTPbiMhlrZvWtCPYcaI7qfSu8MM= -go.flow.arcalot.io/podmandeployer v0.3.1 h1:AbRmTTtuK50PLkZyu194oSra0zUCeM3lDCWTc7oo4Ys= -go.flow.arcalot.io/podmandeployer v0.3.1/go.mod h1:SmROc9nHG+KfKasyTeKtGmI9EBlXCupXjBIgX5glGn8= -go.flow.arcalot.io/pythondeployer v0.1.3 h1:3W0tm1kdIqpP6TKO7OvVCiDlUh39wQOFwDnHISReE9Q= -go.flow.arcalot.io/pythondeployer v0.1.3/go.mod h1:vCkwB72TinFVb367/o0djptTvR+V004i1I5OQUeCcPU= -go.flow.arcalot.io/testdeployer v0.2.0 h1:4/cLr58/e6o5ouVRuJ5hM28nhciwJrL9AOE5Sdb7rN0= -go.flow.arcalot.io/testdeployer v0.2.0/go.mod h1:vy3Iu+9SHmugvOJRtMWAj8R+SE9BYi7k9Xi7DM5n7eQ= -go.flow.arcalot.io/testplugin v0.1.0 h1:I2BT978XISjaSnQbpaJfmjo2cTmTeBV7q+1IwTGbrig= -go.flow.arcalot.io/testplugin v0.1.0/go.mod h1:RsEWotEbX4irH+OM/d3dUOZMhHeBDDhqUymIKqZlESU= +go.flow.arcalot.io/deployer v0.3.0 h1:LPikgRG5jGA76W8JthycvzfREL5Y0+++KAiQxSnKhdU= +go.flow.arcalot.io/deployer v0.3.0/go.mod h1:x6gsz/hANR8qN1nerpyY3vXpdaqofDH5Wlg+Nsqg/x0= +go.flow.arcalot.io/dockerdeployer v0.4.0 h1:t5b8o3xfKKb/WIX558486csjo4uMQmAXsikBLsKFEIg= +go.flow.arcalot.io/dockerdeployer v0.4.0/go.mod h1:UZSM6buJBRlgCURUE/BVkak8tfAXzj3oeQBSRZECbSc= +go.flow.arcalot.io/expressions v0.2.1 h1:TAAbDrgJJLpmgA5ASyP/KzrXWtpEaQ8JsCPHgpe5kLw= +go.flow.arcalot.io/expressions v0.2.1/go.mod h1:Vw1ScNu4Uyw1/l87LAH8jxe0DyRWwMh+rlfB/BPYDOU= +go.flow.arcalot.io/kubernetesdeployer v0.7.0 h1:r41qWc/XiPy9l3cfMXZG8F2kGenRh1xsx2auim/Ydyw= +go.flow.arcalot.io/kubernetesdeployer v0.7.0/go.mod h1:VvU6duoo5NR2ITUhx/UCGrkdJnXIeYm+/yHmGKtkXsk= +go.flow.arcalot.io/pluginsdk v0.5.0 h1:TRS/waCTcdoMZ9neDAcfy3zpzyDnPHRbhV+Y1kpcw3Y= +go.flow.arcalot.io/pluginsdk v0.5.0/go.mod h1:2s2f//7uOkBjr1QaiWJD/bqDIeLlINJtD1BhiY4aGPM= +go.flow.arcalot.io/podmandeployer v0.5.0 h1:h7hEhWUgxJzNKlEohZ+meKhl3FWjaXQahQ8vN3YVRNs= +go.flow.arcalot.io/podmandeployer v0.5.0/go.mod h1:36JCcTB6nauahcXUPfIpdEw7Zfp0ufM07o3VNTvrCc0= +go.flow.arcalot.io/pythondeployer v0.3.0 h1:ercLuDwFoDSL0f6YvZEqFW0/nO7Yv7DkbROl3rKxYDk= +go.flow.arcalot.io/pythondeployer v0.3.0/go.mod h1:ND1x/Vhu/6q50zQeisCcD6oQ6lKVJFflOrfDccnIjSY= +go.flow.arcalot.io/testdeployer v0.3.0 h1:Soyz+rDa3Y3VjWBGuL3zNlX3LM4uKp9Ex7///fCgrZA= +go.flow.arcalot.io/testdeployer v0.3.0/go.mod h1:Eel0ORhtKdYYDsd+e+btBBygIn+9Sz/b+JFDwH39VWI= +go.flow.arcalot.io/testplugin v0.2.1 h1:9kQ2MKvcXtEcwk5c4qSWN+FovpER2C9vn730laAm9iE= +go.flow.arcalot.io/testplugin v0.2.1/go.mod h1:ZoVF8tIKppQmj5nvoZPA48GQ7BuoWXQcuCw2x2sJxjE= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -234,8 +238,8 @@ golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= -golang.org/x/time v0.2.0 h1:52I/1L54xyEQAYdtcSuxtiT84KGYTBGXwayxmIpNJhE= -golang.org/x/time v0.2.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= +golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -295,7 +299,7 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gotest.tools/v3 v3.4.0 h1:ZazjZUfuVeZGLAmlKKuyv3IKP5orXcwtOwDQH6YVr6o= +gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= k8s.io/api v0.25.4 h1:3YO8J4RtmG7elEgaWMb4HgmpS2CfY1QlaOz9nwB+ZSs= diff --git a/internal/step/dummy/provider.go b/internal/step/dummy/provider.go index cc99f333..9df7f8f9 100644 --- a/internal/step/dummy/provider.go +++ b/internal/step/dummy/provider.go @@ -176,7 +176,7 @@ func (r *runnableStep) Lifecycle(_ map[string]any) (result step.Lifecycle[step.L }, nil } -func (r *runnableStep) Start(_ map[string]any, stageChangeHandler step.StageChangeHandler) (step.RunningStep, error) { +func (r *runnableStep) Start(_ map[string]any, runID string, stageChangeHandler step.StageChangeHandler) (step.RunningStep, error) { ctx, cancel := context.WithCancel(context.Background()) @@ -189,6 +189,7 @@ func (r *runnableStep) Start(_ map[string]any, stageChangeHandler step.StageChan // the ProvideInputStage is not blocked. name: make(chan string, 1), state: step.RunningStepStateStarting, + runID: runID, } go s.run() @@ -200,10 +201,12 @@ type runningStep struct { stageChangeHandler step.StageChangeHandler ctx context.Context cancel context.CancelFunc + wg sync.WaitGroup lock *sync.Mutex name chan string state step.RunningStepState inputAvailable bool + runID string } func (r *runningStep) State() step.RunningStepState { @@ -249,8 +252,16 @@ func (r *runningStep) Close() error { return nil } +func (r *runningStep) ForceClose() error { + return r.Close() +} + func (r *runningStep) run() { - defer close(r.name) + r.wg.Add(1) + defer func() { + close(r.name) + r.wg.Done() + }() waitingForInput := false r.lock.Lock() if !r.inputAvailable { @@ -267,6 +278,7 @@ func (r *runningStep) run() { nil, string(StageIDGreet), waitingForInput, + &r.wg, ) select { case name, ok := <-r.name: @@ -283,7 +295,7 @@ func (r *runningStep) run() { r.lock.Lock() r.state = step.RunningStepStateFinished r.lock.Unlock() - r.stageChangeHandler.OnStepComplete(r, string(StageIDGreet), outputID, outputData) + r.stageChangeHandler.OnStepComplete(r, string(StageIDGreet), outputID, outputData, &r.wg) case <-r.ctx.Done(): return } diff --git a/internal/step/dummy/provider_test.go b/internal/step/dummy/provider_test.go index 6ac333c2..622896e4 100644 --- a/internal/step/dummy/provider_test.go +++ b/internal/step/dummy/provider_test.go @@ -2,6 +2,7 @@ package dummy_test import ( "fmt" + "sync" "testing" "go.arcalot.io/assert" @@ -13,7 +14,7 @@ type stageChangeHandler struct { message chan string } -func (s *stageChangeHandler) OnStageChange(_ step.RunningStep, _ *string, _ *string, _ *any, _ string, _ bool) { +func (s *stageChangeHandler) OnStageChange(_ step.RunningStep, _ *string, _ *string, _ *any, _ string, _ bool, _ *sync.WaitGroup) { } @@ -22,6 +23,7 @@ func (s *stageChangeHandler) OnStepComplete( previousStage string, previousStageOutputID *string, previousStageOutput *any, + _ *sync.WaitGroup, ) { if previousStage != "greet" { panic(fmt.Errorf("invalid previous stage: %s", previousStage)) @@ -49,7 +51,7 @@ func TestProvider(t *testing.T) { message: make(chan string), } - running, err := runnable.Start(map[string]any{}, handler) + running, err := runnable.Start(map[string]any{}, t.Name(), handler) assert.NoError(t, err) assert.NoError(t, running.ProvideStageInput("greet", map[string]any{ "name": "Arca Lot", diff --git a/internal/step/foreach/provider.go b/internal/step/foreach/provider.go index b0fa5d37..55d3b58b 100644 --- a/internal/step/foreach/provider.go +++ b/internal/step/foreach/provider.go @@ -19,7 +19,7 @@ func New( executorFactory func(logger log.Logger) (workflow.Executor, error), ) (step.Provider, error) { return &forEachProvider{ - logger: logger, + logger: logger.WithLabel("source", "foreach-provider"), yamlParserFactory: yamlParserFactory, executorFactory: executorFactory, }, nil @@ -319,9 +319,10 @@ func (r *runnableStep) RunSchema() map[string]*schema.PropertySchema { return map[string]*schema.PropertySchema{} } -func (r *runnableStep) Start(_ map[string]any, stageChangeHandler step.StageChangeHandler) (step.RunningStep, error) { +func (r *runnableStep) Start(_ map[string]any, runID string, stageChangeHandler step.StageChangeHandler) (step.RunningStep, error) { ctx, cancel := context.WithCancel(context.Background()) rs := &runningStep{ + runID: runID, ctx: ctx, cancel: cancel, lock: &sync.Mutex{}, @@ -338,6 +339,7 @@ func (r *runnableStep) Start(_ map[string]any, stageChangeHandler step.StageChan } type runningStep struct { + runID string workflow workflow.ExecutableWorkflow currentStage StageID lock *sync.Mutex @@ -345,6 +347,7 @@ type runningStep struct { inputAvailable bool inputData chan []any ctx context.Context + wg sync.WaitGroup cancel context.CancelFunc stageChangeHandler step.StageChangeHandler parallelism int64 @@ -363,13 +366,13 @@ func (r *runningStep) ProvideStageInput(stage string, input map[string]any) erro _, err := r.workflow.Input().Unserialize(item) if err != nil { r.lock.Unlock() - return fmt.Errorf("invalid input item %d for subworkflow (%w)", i, err) + return fmt.Errorf("invalid input item %d for subworkflow (%w) for run/step %s", i, err, r.runID) } input[i] = item } if r.inputAvailable { r.lock.Unlock() - return fmt.Errorf("input for execute workflow provided twice") + return fmt.Errorf("input for execute workflow provided twice for run/step %s", r.runID) } if r.currentState == step.RunningStepStateWaitingForInput && r.currentStage == StageIDExecute { r.currentState = step.RunningStepStateRunning @@ -404,11 +407,21 @@ func (r *runningStep) State() step.RunningStepState { func (r *runningStep) Close() error { r.cancel() + r.wg.Wait() return nil } +func (r *runningStep) ForceClose() error { + // For now, unless it becomes a problem, we'll just call the normal close function. + return r.Close() +} + func (r *runningStep) run() { - defer close(r.inputData) + r.wg.Add(1) + defer func() { + close(r.inputData) + r.wg.Done() + }() waitingForInput := false r.lock.Lock() if !r.inputAvailable { @@ -425,6 +438,7 @@ func (r *runningStep) run() { nil, string(StageIDExecute), waitingForInput, + &r.wg, ) select { case loopData, ok := <-r.inputData: @@ -435,7 +449,7 @@ func (r *runningStep) run() { itemOutputs := make([]any, len(loopData)) itemErrors := make(map[int]string, len(loopData)) - r.logger.Debugf("Executing subworkflow...") + r.logger.Debugf("Executing subworkflow for step %s...", r.runID) wg := &sync.WaitGroup{} wg.Add(len(loopData)) errors := false @@ -471,7 +485,7 @@ func (r *runningStep) run() { }() } wg.Wait() - r.logger.Debugf("Subworkflow complete.") + r.logger.Debugf("Subworkflow %s complete.", r.runID) r.lock.Lock() previousStage := string(r.currentStage) r.currentState = step.RunningStepStateRunning @@ -506,12 +520,13 @@ func (r *runningStep) run() { nil, string(currentStage), false, + &r.wg, ) r.lock.Lock() r.currentState = step.RunningStepStateFinished previousStage = string(r.currentStage) r.lock.Unlock() - r.stageChangeHandler.OnStepComplete(r, previousStage, &outputID, &outputData) + r.stageChangeHandler.OnStepComplete(r, previousStage, &outputID, &outputData, &r.wg) case <-r.ctx.Done(): return } diff --git a/internal/step/plugin/provider.go b/internal/step/plugin/provider.go index a1708766..71fc0354 100644 --- a/internal/step/plugin/provider.go +++ b/internal/step/plugin/provider.go @@ -3,8 +3,11 @@ package plugin import ( "context" "fmt" + "go.flow.arcalot.io/pluginsdk/plugin" + "reflect" "strings" "sync" + "time" log "go.arcalot.io/log/v2" "go.flow.arcalot.io/deployer" @@ -28,12 +31,12 @@ func New(logger log.Logger, deployerRegistry registry.Registry, localDeployerCon if err != nil { return nil, fmt.Errorf("failed to load local deployer configuration, please check your Arcaflow configuration file (%w)", err) } - localDeployer, err := deployerRegistry.Create(unserializedLocalDeployerConfig, logger) + localDeployer, err := deployerRegistry.Create(unserializedLocalDeployerConfig, logger.WithLabel("source", "deployer")) if err != nil { return nil, fmt.Errorf("invalid local deployer configuration, please check your Arcaflow configuration file (%w)", err) } return &pluginProvider{ - logger: logger, + logger: logger.WithLabel("source", "plugin-provider"), deployerRegistry: deployerRegistry, localDeployer: localDeployer, }, nil @@ -166,6 +169,7 @@ var crashedLifecycleStage = step.LifecycleStage{ FinishedName: "crashed", } +// Lifecycle returns a lifecycle that contains all plugin lifecycle stages. func (p *pluginProvider) Lifecycle() step.Lifecycle[step.LifecycleStage] { return step.Lifecycle[step.LifecycleStage]{ InitialStage: string(StageIDDeploy), @@ -181,11 +185,14 @@ func (p *pluginProvider) Lifecycle() step.Lifecycle[step.LifecycleStage] { } } +// LoadSchema deploys the plugin, connects to the plugin's ATP server, loads its schema, then +// returns a runnableStep struct. Not to be confused with the runningStep struct. func (p *pluginProvider) LoadSchema(inputs map[string]any, _ map[string][]byte) (step.RunnableStep, error) { image := inputs["plugin"].(string) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + plugin, err := p.localDeployer.Deploy(ctx, image) if err != nil { cancel() @@ -199,13 +206,17 @@ func (p *pluginProvider) LoadSchema(inputs map[string]any, _ map[string][]byte) cancel() return nil, fmt.Errorf("failed to read plugin schema from %s (%w)", image, err) } + // Tell the server that the client is done + if err := transport.Close(); err != nil { + return nil, fmt.Errorf("failed to instruct client to shut down image %s (%w)", image, err) + } // Shut down the plugin. if err := plugin.Close(); err != nil { return nil, fmt.Errorf("failed to shut down local plugin from %s (%w)", image, err) } return &runnableStep{ - schemas: s, + schemas: *s, logger: p.logger, image: image, deployerRegistry: p.deployerRegistry, @@ -217,7 +228,7 @@ type runnableStep struct { image string deployerRegistry registry.Registry logger log.Logger - schemas schema.Schema[schema.Step] + schemas schema.SchemaSchema localDeployer deployer.Connector } @@ -261,6 +272,32 @@ func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[st if !ok { return result, fmt.Errorf("the step '%s' does not exist in the '%s' plugin", stepID, r.image) } + + stopIfProperty := schema.NewPropertySchema( + schema.NewAnySchema(), + schema.NewDisplayValue( + schema.PointerTo("Stop condition"), + schema.PointerTo("If this field is filled with a non-false value, the step is cancelled (even if currently executing)."), + nil, + ), + false, + nil, + nil, + nil, + nil, + nil, + ) + // Now validate that the step's internal dependencies can be resolved (like stop_if's dependency on the cancel signal) + cancelSignal := stepSchema.SignalHandlers()[plugin.CancellationSignalSchema.ID()] + if cancelSignal == nil { + // Not present + stopIfProperty.Disable(fmt.Sprintf("Cancel signal with ID '%s' is not present in plugin image '%s', step '%s'. Signal handler IDs present: %v", + plugin.CancellationSignalSchema.ID(), r.image, stepID, reflect.ValueOf(stepSchema.SignalHandlers()).MapKeys())) + } else if err := plugin.CancellationSignalSchema.DataSchemaValue.ValidateCompatibility(cancelSignal.DataSchemaValue); err != nil { + // Present but incompatible + stopIfProperty.Disable(fmt.Sprintf("Cancel signal invalid schema in plugin image '%s', step '%s' (%s)", r.image, stepID, err)) + } + return step.Lifecycle[step.LifecycleStageWithSchema]{ InitialStage: "deploying", Stages: []step.LifecycleStageWithSchema{ @@ -350,20 +387,7 @@ func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[st { LifecycleStage: cancelledLifecycleStage, InputSchema: map[string]*schema.PropertySchema{ - "stop_if": schema.NewPropertySchema( - schema.NewAnySchema(), - schema.NewDisplayValue( - schema.PointerTo("Stop condition"), - schema.PointerTo("If this field is filled with a non-false value, the step is cancelled (even if currently executing)."), - nil, - ), - false, - nil, - nil, - nil, - nil, - nil, - ), + "stop_if": stopIfProperty, }, Outputs: nil, }, @@ -403,7 +427,7 @@ func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[st }, nil } -func (r *runnableStep) Start(input map[string]any, stageChangeHandler step.StageChangeHandler) (step.RunningStep, error) { +func (r *runnableStep) Start(input map[string]any, runID string, stageChangeHandler step.StageChangeHandler) (step.RunningStep, error) { rawStep, ok := input["step"] stepID := "" if ok && rawStep != nil { @@ -448,15 +472,17 @@ func (r *runnableStep) Start(input map[string]any, stageChangeHandler step.Stage lock: &sync.Mutex{}, ctx: ctx, cancel: cancel, - done: make(chan struct{}), deployInput: make(chan any, 1), runInput: make(chan any, 1), logger: r.logger, image: r.image, - step: stepID, + pluginStepID: stepID, state: step.RunningStepStateStarting, localDeployer: r.localDeployer, - executionChannel: make(chan executionResult), + executionChannel: make(chan atp.ExecutionResult), + signalToStep: make(chan schema.Input), + signalFromStep: make(chan schema.Input), + runID: runID, } go s.run() @@ -469,34 +495,43 @@ type runningStep struct { stepSchema schema.Step stageChangeHandler step.StageChangeHandler lock *sync.Mutex + wg sync.WaitGroup ctx context.Context cancel context.CancelFunc - done chan struct{} + atpClient atp.Client deployInput chan any deployInputAvailable bool runInput chan any runInputAvailable bool logger log.Logger currentStage StageID + runID string // The ID associated with this execution (the workflow step ID) image string - step string + pluginStepID string // The ID of the step in the plugin state step.RunningStepState useLocalDeployer bool localDeployer deployer.Connector container deployer.Plugin - executionChannel chan executionResult + executionChannel chan atp.ExecutionResult + signalToStep chan schema.Input // Communicates with the ATP client, not other steps. + signalFromStep chan schema.Input // Communicates with the ATP client, not other steps. + closed bool + // Store channels for sending pre-calculated signal outputs to other steps? + // Store channels for receiving pre-calculated signal inputs from other steps? } func (r *runningStep) CurrentStage() string { r.lock.Lock() defer r.lock.Unlock() - return string(r.currentStage) + tempStage := string(r.currentStage) + return tempStage } func (r *runningStep) State() step.RunningStepState { r.lock.Lock() defer r.lock.Unlock() - return r.state + tempState := r.state + return tempState } func (r *runningStep) ProvideStageInput(stage string, input map[string]any) error { @@ -504,104 +539,173 @@ func (r *runningStep) ProvideStageInput(stage string, input map[string]any) erro // affect the counting of step states in the workflow's Execute function // and notifySteps function. r.lock.Lock() + defer r.lock.Unlock() + r.logger.Debugf("ProvideStageInput START") + defer r.logger.Debugf("ProvideStageInput END") - // Checks which stage it is getting input for + // Checks which stage it is getting input for, and handles it. switch stage { case string(StageIDDeploy): - // input provided on this call overwrites the deployer configuration - // set at this plugin provider's instantiation - if r.deployInputAvailable { - r.lock.Unlock() - return fmt.Errorf("deployment information provided more than once") - } - var unserializedDeployerConfig any - var err error - if input["deploy"] != nil { - unserializedDeployerConfig, err = r.deployerRegistry.Schema().Unserialize(input["deploy"]) - if err != nil { - r.lock.Unlock() - return fmt.Errorf("invalid deployment information (%w)", err) - } - } else { - r.useLocalDeployer = true - } - // Make sure we transition the state before unlocking so there are no race conditions. - - r.deployInputAvailable = true - if r.state == step.RunningStepStateWaitingForInput && r.currentStage == StageIDDeploy { - r.state = step.RunningStepStateRunning - } - r.lock.Unlock() - // Feed the deploy step its input. - r.deployInput <- unserializedDeployerConfig - return nil + return r.provideDeployInput(input) case string(StageIDStarting): - if r.runInputAvailable { - r.lock.Unlock() - return fmt.Errorf("input provided more than once") - } - if input["input"] == nil { - r.lock.Unlock() - return fmt.Errorf("bug: invalid input for 'running' stage, expected 'input' field") - } - if _, err := r.stepSchema.Input().Unserialize(input["input"]); err != nil { - r.lock.Unlock() - return err - } - // Make sure we transition the state before unlocking so there are no race conditions. - r.runInputAvailable = true - if r.state == step.RunningStepStateWaitingForInput && r.currentStage == StageIDStarting { - r.state = step.RunningStepStateRunning - } - // Unlock before passing the data over the channel to prevent a deadlock. - // The other end of the channel needs to be unlocked to read the data. - r.lock.Unlock() - // Feed the run step its input over the channel. - r.runInput <- input["input"] - return nil + return r.provideStartingInput(input) case string(StageIDRunning): - r.lock.Unlock() return nil case string(StageIDCancelled): - if input["stop_if"] != false && input["stop_if"] != nil { - r.logger.Infof("Cancelling step %s", r.step) - r.cancel() // This should cancel the plugin deployment or execution. - } - r.lock.Unlock() - return nil + return r.provideCancelledInput(input) case string(StageIDDeployFailed): - r.lock.Unlock() return nil case string(StageIDCrashed): - r.lock.Unlock() return nil case string(StageIDOutput): - r.lock.Unlock() return nil default: - r.lock.Unlock() return fmt.Errorf("bug: invalid stage: %s", stage) } } +func (r *runningStep) provideDeployInput(input map[string]any) error { + // Note: The calling function must have the step mutex locked + // input provided on this call overwrites the deployer configuration + // set at this plugin provider's instantiation + if r.deployInputAvailable { + return fmt.Errorf("deployment information provided more than once") + } + var unserializedDeployerConfig any + var err error + if input["deploy"] != nil { + unserializedDeployerConfig, err = r.deployerRegistry.Schema().Unserialize(input["deploy"]) + if err != nil { + return fmt.Errorf("invalid deployment information (%w)", err) + } + } else { + r.useLocalDeployer = true + } + // Make sure we transition the state before unlocking so there are no race conditions. + + r.deployInputAvailable = true + if r.state == step.RunningStepStateWaitingForInput && r.currentStage == StageIDDeploy { + r.state = step.RunningStepStateRunning + } + + // Feed the deploy step its input. + select { + case r.deployInput <- unserializedDeployerConfig: + default: + return fmt.Errorf("unable to provide input to deploy stage for step %s/%s", r.runID, r.pluginStepID) + } + return nil +} + +func (r *runningStep) provideStartingInput(input map[string]any) error { + // Note: The calling function must have the step mutex locked + if r.runInputAvailable { + return fmt.Errorf("input provided more than once") + } + // Ensure input is given + if input["input"] == nil { + return fmt.Errorf("bug: invalid input for 'running' stage, expected 'input' field") + } + // Validate the input by unserializing it + if _, err := r.stepSchema.Input().Unserialize(input["input"]); err != nil { + return err + } + // Make sure we transition the state before unlocking so there are no race conditions. + r.runInputAvailable = true + + // Unlock before passing the data over the channel to prevent a deadlock. + // The other end of the channel needs to be unlocked to read the data. + + // Feed the run step its input over the channel. + select { + case r.runInput <- input["input"]: + default: + return fmt.Errorf("unable to provide input to run stage for step %s/%s", r.runID, r.pluginStepID) + } + return nil +} + +func (r *runningStep) provideCancelledInput(input map[string]any) error { + // Note: The calling function must have the step mutex locked + // Cancel if the step field is present and isn't false + if input["stop_if"] != false && input["stop_if"] != nil { + r.cancelStep() + } + return nil +} + +// cancelStep gracefully requests cancellation for any stage. +// If running, it sends a cancel signal if the plugin supports it. +func (r *runningStep) cancelStep() { + r.logger.Infof("Cancelling step %s/%s", r.runID, r.pluginStepID) + // We only need to call the signal if the step is running. + // If it isn't, cancelling the context alone should be enough. + if r.currentStage == StageIDRunning { + // Verify that the step has a cancel signal + cancelSignal := r.stepSchema.SignalHandlers()[plugin.CancellationSignalSchema.ID()] + if cancelSignal == nil { + r.logger.Errorf("could not cancel step %s/%s. Does not contain cancel signal receiver.", r.runID, r.pluginStepID) + } else if err := plugin.CancellationSignalSchema.DataSchema().ValidateCompatibility(cancelSignal.DataSchema()); err != nil { + r.logger.Errorf("validation failed for cancel signal for step %s/%s: %s", r.runID, r.pluginStepID, err) + } else { + // Validated. Now call the signal. + r.signalToStep <- schema.Input{RunID: r.runID, ID: cancelSignal.ID(), InputData: map[any]any{}} + } + } + // Now cancel the context to stop the non-running parts of the step + r.cancel() +} + +// ForceClose closes the step without waiting for a graceful shutdown of the ATP client. +// Warning: This means that it won't wait for the ATP client to finish. This is okay if using a deployer that +// will stop execution once the deployer closes it. +func (r *runningStep) ForceClose() error { + err := r.closeComponents(false) + // Wait for the run to finish to ensure that it's not running after closing. + r.wg.Wait() + r.closed = true + r.logger.Warningf("Step %s/%s force closed.", r.runID, r.pluginStepID) + return err +} + func (r *runningStep) Close() error { + err := r.closeComponents(true) + // Wait for the run to finish to ensure that it's not running after closing. + r.wg.Wait() + r.closed = true + return err +} + +func (r *runningStep) closeComponents(closeATP bool) error { r.cancel() r.lock.Lock() + if r.closed { + return nil // Already closed + } + var atpErr error + var containerErr error + if r.atpClient != nil && closeATP { + atpErr = r.atpClient.Close() + } if r.container != nil { - if err := r.container.Close(); err != nil { - return fmt.Errorf("failed to stop container (%w)", err) - } + containerErr = r.container.Close() } r.container = nil r.lock.Unlock() - <-r.done + if containerErr != nil { + return fmt.Errorf("error while stopping container (%w)", containerErr) + } else if atpErr != nil { + return fmt.Errorf("error while stopping atp client (%w)", atpErr) + // Do not wait in this case. It may never get resolved. + } return nil } func (r *runningStep) run() { + r.wg.Add(1) // Wait for the run to finish before closing. defer func() { - r.cancel() - close(r.done) + r.cancel() // Close before WaitGroup done + r.wg.Done() // Done. Close may now exit. }() container, err := r.deployStage() if err != nil { @@ -612,7 +716,7 @@ func (r *runningStep) run() { select { case <-r.ctx.Done(): if err := container.Close(); err != nil { - r.logger.Warningf("failed to remove deployed container for step %s", r.step) + r.logger.Warningf("failed to remove deployed container for step %s/%s", r.runID, r.pluginStepID) } r.lock.Unlock() return @@ -620,6 +724,7 @@ func (r *runningStep) run() { r.container = container } r.lock.Unlock() + r.logger.Debugf("Successfully deployed container with ID '%s' for step %s/%s", container.ID(), r.runID, r.pluginStepID) if err := r.startStage(container); err != nil { r.startFailed(err) return @@ -630,12 +735,9 @@ func (r *runningStep) run() { } func (r *runningStep) deployStage() (deployer.Plugin, error) { + r.logger.Debugf("Deploying stage for step %s/%s", r.runID, r.pluginStepID) r.lock.Lock() - if !r.deployInputAvailable { - r.state = step.RunningStepStateWaitingForInput - } else { - r.state = step.RunningStepStateRunning - } + r.state = step.RunningStepStateRunning deployInputAvailable := r.deployInputAvailable r.lock.Unlock() @@ -646,24 +748,39 @@ func (r *runningStep) deployStage() (deployer.Plugin, error) { nil, string(StageIDDeploy), deployInputAvailable, + &r.wg, ) var deployerConfig any var useLocalDeployer bool + // First, non-blocking retrieval select { case deployerConfig = <-r.deployInput: r.lock.Lock() r.state = step.RunningStepStateRunning - useLocalDeployer = r.useLocalDeployer r.lock.Unlock() - case <-r.ctx.Done(): - return nil, fmt.Errorf("step closed before deployment config could be obtained") + default: // Default, so it doesn't block on this receive + // It's waiting now. + r.lock.Lock() + r.state = step.RunningStepStateWaitingForInput + r.lock.Unlock() + select { + case deployerConfig = <-r.deployInput: + r.lock.Lock() + r.state = step.RunningStepStateRunning + r.lock.Unlock() + case <-r.ctx.Done(): + return nil, fmt.Errorf("step closed before deployment config could be obtained") + } } + r.lock.Lock() + useLocalDeployer = r.useLocalDeployer + r.lock.Unlock() var stepDeployer = r.localDeployer if !useLocalDeployer { var err error - stepDeployer, err = r.deployerRegistry.Create(deployerConfig, r.logger) + stepDeployer, err = r.deployerRegistry.Create(deployerConfig, r.logger.WithLabel("source", "deployer")) if err != nil { return nil, err } @@ -675,22 +792,23 @@ func (r *runningStep) deployStage() (deployer.Plugin, error) { return container, nil } -type executionResult struct { - outputID string - outputData any - err error -} - func (r *runningStep) startStage(container deployer.Plugin) error { + r.logger.Debugf("Starting stage for step %s/%s", r.runID, r.pluginStepID) r.lock.Lock() previousStage := string(r.currentStage) r.currentStage = StageIDStarting + inputRecievedEarly := false - if !r.runInputAvailable { - r.state = step.RunningStepStateWaitingForInput - } else { + var runInput any + select { + case runInput = <-r.runInput: + // Good. It received it immediately. r.state = step.RunningStepStateRunning + inputRecievedEarly = true + default: // The default makes it not wait. + r.state = step.RunningStepStateWaitingForInput } + runInputAvailable := r.runInputAvailable r.lock.Unlock() @@ -701,215 +819,178 @@ func (r *runningStep) startStage(container deployer.Plugin) error { nil, string(StageIDStarting), runInputAvailable, + &r.wg, ) r.lock.Lock() r.currentStage = StageIDStarting - r.state = step.RunningStepStateWaitingForInput + r.logger.Debugf("Waiting for input state while starting 2.") r.lock.Unlock() - var runInput any - select { - case runInput = <-r.runInput: + // First, try to non-blocking retrieve the runInput. + // If not yet available, set to state waiting for input and do a blocking receive. + // If it is available, continue. + if !inputRecievedEarly { + // Input is not yet available. Now waiting. r.lock.Lock() - r.state = step.RunningStepStateRunning + if r.state != step.RunningStepStateWaitingForInput { + r.logger.Warningf("State not waiting for input when receiving from channel.") + } r.lock.Unlock() - case <-r.ctx.Done(): - return fmt.Errorf("step closed while waiting for run configuration") + + // Do a blocking wait for input now. + select { + case runInput = <-r.runInput: + r.lock.Lock() + r.state = step.RunningStepStateRunning + r.lock.Unlock() + case <-r.ctx.Done(): + return fmt.Errorf("step closed while waiting for run configuration") + } } - atpClient := atp.NewClientWithLogger(container, r.logger) + r.atpClient = atp.NewClientWithLogger(container, r.logger) - inputSchema, err := atpClient.ReadSchema() + inputSchema, err := r.atpClient.ReadSchema() if err != nil { return err } steps := inputSchema.Steps() - stepSchema, ok := steps[r.step] + stepSchema, ok := steps[r.pluginStepID] if !ok { - return fmt.Errorf("schema mismatch between local and remote deployed plugin, no stepSchema named %s found in remote", r.step) + return fmt.Errorf("error in run step %s: schema mismatch between local and remote deployed plugin, no stepSchema named %s found in remote", r.runID, r.pluginStepID) } + // Re-verify input. This should have also been done earlier. if _, err := stepSchema.Input().Unserialize(runInput); err != nil { - return fmt.Errorf("schema mismatch between local and remote deployed plugin, unserializing input failed (%w)", err) + return fmt.Errorf("schema mismatch between local and remote deployed plugin in step %s/%s, unserializing input failed (%w)", r.runID, r.pluginStepID, err) } - // Runs the ATP client in a goroutine in order to wait for it or context done. - // On context done, the deployer tries to end execution. That will shut down - // (with sigterm) the container. Then wait for output, or error out. + // Runs the ATP client in a goroutine in order to wait for it. + // On context done, the deployer has 30 seconds before it will error out. go func() { - outputID, outputData, err := atpClient.Execute(r.step, runInput) - r.executionChannel <- executionResult{outputID, outputData, err} + result := r.atpClient.Execute( + schema.Input{RunID: r.runID, ID: r.pluginStepID, InputData: runInput}, + r.signalToStep, + r.signalFromStep, + ) + r.executionChannel <- result + if err = r.atpClient.Close(); err != nil { + r.logger.Warningf("Error while closing ATP client: %s", err) + } }() return nil } func (r *runningStep) runStage() error { - r.lock.Lock() - previousStage := string(r.currentStage) - r.currentStage = StageIDRunning - r.state = step.RunningStepStateRunning - r.lock.Unlock() - - r.stageChangeHandler.OnStageChange( - r, - &previousStage, - nil, - nil, - string(StageIDRunning), - false, - ) + r.logger.Debugf("Running stage for step %s/%s", r.runID, r.pluginStepID) + r.transitionStage(StageIDRunning, step.RunningStepStateRunning) - var result executionResult + var result atp.ExecutionResult select { case result = <-r.executionChannel: - if result.err != nil { - return result.err + if result.Error != nil { + return result.Error } case <-r.ctx.Done(): - // In this case, it is being instructed to stop. + // In this case, it is being instructed to stop. A signal should have been sent. // Shutdown (with sigterm) the container, then wait for the output (valid or error). - r.logger.Debugf("Running step context done before step run complete. Cancelling and waiting for result.") - r.cancel() - // If necessary, you can add a timeout here for shutdowns that take too long. - result = <-r.executionChannel - } - - // Execution complete, move to finished stage. - r.lock.Lock() - // Be careful that everything here is set correctly. - // Else it will cause undesired behavior. - previousStage = string(r.currentStage) - r.currentStage = StageIDOutput - // First running, then state change, then finished. - // This is so it properly steps through all the stages it needs to. - r.state = step.RunningStepStateRunning - r.lock.Unlock() + r.logger.Debugf("Got step context done before step run complete. Waiting up to 30 seconds for result.") + select { + case result = <-r.executionChannel: + // Successfully stopped before end of timeout. + case <-time.After(time.Duration(30) * time.Second): + r.logger.Warningf("Step %s/%s did not complete within the 30 second time limit. Force closing container.", + r.runID, r.pluginStepID) + if err := r.ForceClose(); err != nil { + r.logger.Warningf("Error in step %s/%s while closing plugin container (%w)", r.runID, r.pluginStepID, err) + } + } - r.stageChangeHandler.OnStageChange( - r, - &previousStage, - nil, - nil, - string(r.currentStage), - false) + } - r.lock.Lock() - r.state = step.RunningStepStateFinished - r.lock.Unlock() - r.stageChangeHandler.OnStepComplete( - r, - string(r.currentStage), - &result.outputID, - &result.outputData, - ) + // Execution complete, move to state running stage outputs, then to state finished stage. + r.transitionStage(StageIDOutput, step.RunningStepStateRunning) + r.completeStage(r.currentStage, step.RunningStepStateFinished, &result.OutputID, &result.OutputData) return nil } func (r *runningStep) deployFailed(err error) { - r.lock.Lock() - previousStage := string(r.currentStage) - r.currentStage = StageIDDeployFailed - // Don't forget to update this, or else it will behave very oddly. - // First running, then finished. You can't skip states. - r.state = step.RunningStepStateRunning - r.lock.Unlock() - - r.stageChangeHandler.OnStageChange( - r, - &previousStage, - nil, - nil, - string(StageIDDeployFailed), - false, - ) - r.logger.Warningf("Plugin %s deploy failed. %v", r.step, err) + r.logger.Debugf("Deploy failed stage for step %s/%s", r.runID, r.pluginStepID) + r.transitionStage(StageIDDeployFailed, step.RunningStepStateRunning) + r.logger.Warningf("Plugin step %s/%s deploy failed. %v", r.runID, r.pluginStepID, err) // Now it's done. - r.lock.Lock() - r.currentStage = StageIDDeployFailed - r.state = step.RunningStepStateFinished - r.lock.Unlock() - outputID := errorStr output := any(DeployFailed{ Error: err.Error(), }) - r.stageChangeHandler.OnStepComplete( - r, - string(r.currentStage), - &outputID, - &output, - ) + r.completeStage(StageIDDeployFailed, step.RunningStepStateFinished, &outputID, &output) } func (r *runningStep) startFailed(err error) { - r.lock.Lock() - previousStage := string(r.currentStage) - r.currentStage = StageIDCrashed - r.lock.Unlock() - - r.stageChangeHandler.OnStageChange( - r, - &previousStage, - nil, - nil, - string(r.currentStage), - false) - r.logger.Warningf("Plugin step %s start failed. %v", r.step, err) + r.logger.Debugf("Start failed stage for step %s/%s", r.runID, r.pluginStepID) + r.transitionStage(StageIDCrashed, step.RunningStepStateRunning) + r.logger.Warningf("Plugin step %s/%s start failed. %v", r.runID, r.pluginStepID, err) // Now it's done. - r.lock.Lock() - r.currentStage = StageIDCrashed - r.state = step.RunningStepStateFinished - r.lock.Unlock() - outputID := errorStr output := any(Crashed{ Output: err.Error(), }) - r.stageChangeHandler.OnStepComplete( - r, - string(r.currentStage), - &outputID, - &output, - ) + + r.completeStage(StageIDCrashed, step.RunningStepStateFinished, &outputID, &output) } func (r *runningStep) runFailed(err error) { + r.logger.Debugf("Run failed stage for step %s/%s", r.runID, r.pluginStepID) + r.transitionStage(StageIDCrashed, step.RunningStepStateRunning) + r.logger.Warningf("Plugin step %s/%s run failed. %v", r.runID, r.pluginStepID, err) + + // Now it's done. + outputID := errorStr + output := any(Crashed{ + Output: err.Error(), + }) + r.completeStage(StageIDCrashed, step.RunningStepStateFinished, &outputID, &output) +} + +// TransitionStage transitions the stage to the specified stage, and the state to the specified state. +// +//nolint:unparam +func (r *runningStep) transitionStage(newStage StageID, state step.RunningStepState) { // A current lack of observability into the atp client prevents // non-fragile testing of this function. - r.lock.Lock() previousStage := string(r.currentStage) - r.currentStage = StageIDCrashed + r.currentStage = newStage // Don't forget to update this, or else it will behave very oddly. - // First running, then finished. You can't skip states. r.state = step.RunningStepStateRunning + // First running, then finished. You can't skip states. + r.state = state r.lock.Unlock() - r.stageChangeHandler.OnStageChange( r, &previousStage, nil, nil, - string(r.currentStage), - false) - - r.logger.Warningf("Plugin step %s run failed. %v", r.step, err) + string(newStage), + false, + &r.wg, + ) +} - // Now it's done. +//nolint:unparam +func (r *runningStep) completeStage(currentStage StageID, state step.RunningStepState, outputID *string, previousStageOutput *any) { r.lock.Lock() - r.currentStage = StageIDCrashed - r.state = step.RunningStepStateFinished + previousStage := string(r.currentStage) + r.currentStage = currentStage + r.state = state r.lock.Unlock() - outputID := errorStr - output := any(Crashed{ - Output: err.Error(), - }) r.stageChangeHandler.OnStepComplete( r, - string(r.currentStage), - &outputID, - &output, + previousStage, + outputID, + previousStageOutput, + &r.wg, ) } diff --git a/internal/step/plugin/provider_test.go b/internal/step/plugin/provider_test.go index de3d797f..2db5f39b 100644 --- a/internal/step/plugin/provider_test.go +++ b/internal/step/plugin/provider_test.go @@ -9,6 +9,7 @@ import ( "go.flow.arcalot.io/engine/internal/step" "go.flow.arcalot.io/engine/internal/step/plugin" testdeployer "go.flow.arcalot.io/testdeployer" + "sync" "testing" ) @@ -16,7 +17,7 @@ type deployFailStageChangeHandler struct { message chan string } -func (s *deployFailStageChangeHandler) OnStageChange(_ step.RunningStep, _ *string, _ *string, _ *any, _ string, _ bool) { +func (s *deployFailStageChangeHandler) OnStageChange(_ step.RunningStep, _ *string, _ *string, _ *any, _ string, _ bool, _ *sync.WaitGroup) { } @@ -25,6 +26,7 @@ func (s *deployFailStageChangeHandler) OnStepComplete( previousStage string, previousStageOutputID *string, previousStageOutput *any, + _ *sync.WaitGroup, ) { if previousStage != string(plugin.StageIDDeployFailed) { panic(fmt.Errorf("invalid previous stage: %s", previousStage)) @@ -47,7 +49,7 @@ type startFailStageChangeHandler struct { message chan string } -func (s *startFailStageChangeHandler) OnStageChange(_ step.RunningStep, _ *string, _ *string, _ *any, _ string, _ bool) { +func (s *startFailStageChangeHandler) OnStageChange(_ step.RunningStep, _ *string, _ *string, _ *any, _ string, _ bool, _ *sync.WaitGroup) { } @@ -56,6 +58,7 @@ func (s *startFailStageChangeHandler) OnStepComplete( previousStage string, previousStageOutputID *string, previousStageOutput *any, + _ *sync.WaitGroup, ) { if previousStage != string(plugin.StageIDCrashed) { panic(fmt.Errorf("invalid previous stage: %s", previousStage)) @@ -79,7 +82,7 @@ type stageChangeHandler struct { message chan string } -func (s *stageChangeHandler) OnStageChange(_ step.RunningStep, _ *string, _ *string, _ *any, _ string, _ bool) { +func (s *stageChangeHandler) OnStageChange(_ step.RunningStep, _ *string, _ *string, _ *any, _ string, _ bool, _ *sync.WaitGroup) { } @@ -88,6 +91,7 @@ func (s *stageChangeHandler) OnStepComplete( previousStage string, previousStageOutputID *string, previousStageOutput *any, + _ *sync.WaitGroup, ) { if previousStage != string(plugin.StageIDOutput) { panic(fmt.Errorf("invalid previous stage: %s", previousStage)) @@ -141,8 +145,12 @@ func TestProvider_Utility(t *testing.T) { _, err = runnable.Lifecycle(map[string]any{"step": "wait"}) assert.NoError(t, err) - _, err = runnable.Lifecycle(map[string]any{"step": nil}) + _, err = runnable.Lifecycle(map[string]any{"step": "hello"}) assert.NoError(t, err) + + // There is more than one step, so no specified one will cause an error. + _, err = runnable.Lifecycle(map[string]any{"step": nil}) + assert.Error(t, err) } func TestProvider_HappyError(t *testing.T) { @@ -182,11 +190,11 @@ func TestProvider_HappyError(t *testing.T) { } // start with a step id that is not in the schema - _, err = runnable.Start(map[string]any{"step": "wrong_stepid"}, handler) + _, err = runnable.Start(map[string]any{"step": "wrong_stepid"}, t.Name(), handler) assert.Error(t, err) - // default step id - running, err := runnable.Start(map[string]any{"step": nil}, handler) + // wait step + running, err := runnable.Start(map[string]any{"step": "wait"}, t.Name(), handler) assert.NoError(t, err) // non-existent stage @@ -255,6 +263,59 @@ func TestProvider_HappyError(t *testing.T) { }) } +func TestProvider_VerifyCancelSignal(t *testing.T) { + logger := log.New( + log.Config{ + Level: log.LevelError, + Destination: log.DestinationStdout, + }, + ) + workflowDeployerCfg := map[string]any{ + "type": "test-impl", + } + + deployerRegistry := deployer_registry.New( + deployer.Any(testdeployer.NewFactory())) + + plp, err := plugin.New( + logger, + deployerRegistry, + workflowDeployerCfg, + ) + assert.NoError(t, err) + + runnable, err := plp.LoadSchema( + map[string]any{"plugin": "simulation"}, map[string][]byte{}) + assert.NoError(t, err) + assert.NotNil(t, runnable) + + waitLifecycle, err := runnable.Lifecycle(map[string]any{"step": "wait"}) + assert.NoError(t, err) + // Verify that the expected lifecycle stage is there, then verify that cancel is disabled. + waitCancelledStageIDIndex := assert.SliceContainsExtractor(t, + func(schema step.LifecycleStageWithSchema) string { + return schema.ID + }, string(plugin.StageIDCancelled), waitLifecycle.Stages) + waitStageIDCancelled := waitLifecycle.Stages[waitCancelledStageIDIndex] + waitStopIfSchema := assert.MapContainsKey(t, "stop_if", waitStageIDCancelled.InputSchema) + if waitStopIfSchema.Disabled { + t.Fatalf("step wait's wait_for schema is disabled when the cancel signal is present.") + } + + helloLifecycle, err := runnable.Lifecycle(map[string]any{"step": "hello"}) + assert.NoError(t, err) + // Verify that the expected lifecycle stage is there, then verify that cancel is disabled. + helloCancelledStageIDIndex := assert.SliceContainsExtractor(t, + func(schema step.LifecycleStageWithSchema) string { + return schema.ID + }, string(plugin.StageIDCancelled), helloLifecycle.Stages) + helloStageIDCancelled := helloLifecycle.Stages[helloCancelledStageIDIndex] + helloStopIfSchema := assert.MapContainsKey(t, "stop_if", helloStageIDCancelled.InputSchema) + if !helloStopIfSchema.Disabled { + t.Fatalf("step hello's stop_if schema is not disabled when the cancel signal is not present.") + } +} + func TestProvider_DeployFail(t *testing.T) { logConfig := log.Config{ Level: log.LevelError, @@ -289,8 +350,8 @@ func TestProvider_DeployFail(t *testing.T) { message: make(chan string), } - // default step id - running, err := runnable.Start(map[string]any{"step": nil}, handler) + // wait step + running, err := runnable.Start(map[string]any{"step": "wait"}, t.Name(), handler) assert.NoError(t, err) assert.NoError(t, running.ProvideStageInput( @@ -353,7 +414,7 @@ func TestProvider_StartFail(t *testing.T) { message: make(chan string), } - running, err := runnable.Start(map[string]any{"step": "wait"}, handler) + running, err := runnable.Start(map[string]any{"step": "wait"}, t.Name(), handler) assert.NoError(t, err) // tell deployer that this run should not succeed diff --git a/internal/step/provider.go b/internal/step/provider.go index 3e173675..b8722486 100644 --- a/internal/step/provider.go +++ b/internal/step/provider.go @@ -2,6 +2,7 @@ package step import ( "go.flow.arcalot.io/pluginsdk/schema" + "sync" ) // Provider is the description of an item that fits in a workflow. Its implementation provide the @@ -42,7 +43,8 @@ type StageChangeHandler interface { previousStageOutputID *string, previousStageOutput *any, newStage string, - waitingForInput bool, + inputAvailable bool, + wg *sync.WaitGroup, ) // OnStepComplete is called when the step has completed a final stage in its lifecycle and communicates the output. @@ -52,6 +54,7 @@ type StageChangeHandler interface { previousStage string, previousStageOutputID *string, previousStageOutput *any, + wg *sync.WaitGroup, ) } @@ -67,6 +70,7 @@ type RunnableStep interface { // match the RunSchema. Start( input map[string]any, + runID string, stageChangeHandler StageChangeHandler, ) (RunningStep, error) } @@ -81,7 +85,7 @@ const ( RunningStepStateWaitingForInput RunningStepState = "waiting_for_input" // RunningStepStateRunning indicates that the step is working. RunningStepStateRunning RunningStepState = "running" - // RunningStepStateFinished indicates that the step has finished. + // RunningStepStateFinished indicates that the step has finished, including failure cases. RunningStepStateFinished RunningStepState = "finished" ) @@ -97,4 +101,6 @@ type RunningStep interface { State() RunningStepState // Close shuts down the step and cleans up the resources associated with the step. Close() error + // ForceClose shuts down the step forcefully. + ForceClose() error } diff --git a/workflow/any.go b/workflow/any.go index 14000d6e..0040fa3c 100644 --- a/workflow/any.go +++ b/workflow/any.go @@ -33,6 +33,15 @@ func (a *anySchemaWithExpressions) Validate(data any) error { return err } +func (a *anySchemaWithExpressions) ValidateCompatibility(dataOrType any) error { + // If expression, resolve it before calling ValidateCompatibility + if _, ok := dataOrType.(expressions.Expression); ok { + // Assume okay + return nil + } + return a.anySchema.ValidateCompatibility(dataOrType) +} + func (a *anySchemaWithExpressions) Serialize(data any) (any, error) { return a.checkAndConvert(data) } diff --git a/workflow/executor.go b/workflow/executor.go index 070afe51..f1a4add4 100644 --- a/workflow/executor.go +++ b/workflow/executor.go @@ -30,7 +30,7 @@ func NewExecutor( return nil, fmt.Errorf("bug: no step registry passed to NewExecutor") } return &executor{ - logger: logger, + logger: logger.WithLabel("source", "executor"), stepRegistry: stepRegistry, config: config, }, nil @@ -122,7 +122,15 @@ func (e *executor) Prepare(workflow *Workflow, workflowContext map[string][]byte return nil, err } - // Stage 5: The output data model + // Stage 5: Verify stage inputs + // Now that the output properties are here and the internal data model is here, it should be possible to loop + // through them again to verify that all inputs are valid. So verify that all required inputs are present, schemas + // are valid, etc. + // Do this by looping through the steps' inputs, then verifying that the dag can provide them. + if err := e.verifyWorkflowStageInputs(workflow, workflowContext, stepLifecycles, dag, internalDataModel); err != nil { + return nil, err + } + // Stage 6: The output data model //goland:noinspection GoDeprecation if workflow.Output != nil { if len(workflow.Outputs) > 0 { @@ -277,6 +285,7 @@ func (e *executor) processSteps( nil, ) } + return runnableSteps, stepOutputProperties, stepLifecycles, stepRunData, nil } @@ -323,6 +332,164 @@ func (e *executor) connectStepDependencies( return nil } +// verifyWorkflowStageInputs verifies the schemas of the step inputs. +func (e *executor) verifyWorkflowStageInputs( + workflow *Workflow, + workflowContext map[string][]byte, + stepLifecycles map[string]step.Lifecycle[step.LifecycleStageWithSchema], + dag dgraph.DirectedGraph[*DAGItem], + internalDataModel *schema.ScopeSchema, +) error { + // Loop through all the steps in the engine + for stepID /*stepData is the unused key*/ := range workflow.Steps { + // Then loop through the stages of that step + lifecycle := stepLifecycles[stepID] + for _, stage := range lifecycle.Stages { + err := e.verifyStageInputs(dag, stepID, stage, workflowContext, internalDataModel) + if err != nil { + return err + } + } + } + return nil +} + +func (e *executor) verifyStageInputs( + dag dgraph.DirectedGraph[*DAGItem], + stepID string, + stage step.LifecycleStageWithSchema, + workflowContext map[string][]byte, + internalDataModel *schema.ScopeSchema, +) error { + // First, get the parsed inputs of the stage + parsedInputs, err := e.getStageInputs(dag, stepID, stage) + if err != nil { + return err + } + // Next, loop through the input schema fields. + for name, stageInputSchema := range stage.InputSchema { + providedInputForField := parsedInputs[name] + // Check if the field is present in the stage data. + // If it is NOT present and is NOT required, continue to next field. + // If it is NOT present and IS required, fail + // If it IS present, verify whether schema is compatible with the schema of the provided data, + // then notify the provider that the data is present. + // This is running pre-workflow run, so you can check the schemas, but most fields won't be able to be + // resolved to an actual value. + if providedInputForField == nil { + // not present + if stageInputSchema.RequiredValue { + return fmt.Errorf("required input %s of type %s not found for step %s", + name, stageInputSchema.TypeID(), stepID) + } + } else { + // It is present, so make sure it is compatible. + err := e.preValidateCompatibility(internalDataModel, providedInputForField, stageInputSchema, workflowContext) + if err != nil { + return fmt.Errorf("input validation failed for workflow step %s stage %s (%w)", stepID, stage.ID, err) + } + } + } + return nil +} + +func (e *executor) getStageInputs( + dag dgraph.DirectedGraph[*DAGItem], + stepID string, + stage step.LifecycleStageWithSchema, +) (map[string]any, error) { + currentStageNode, err := dag.GetNodeByID(GetStageNodeID(stepID, stage.ID)) + if err != nil { + return nil, fmt.Errorf("bug: node for current stage not found (%w)", err) + } + // stageData provides the info needed for this node, without the expressions resolved. + stageData := currentStageNode.Item().Data + + // Use reflection to convert the stage's input data to a readable map. + parsedInputs := make(map[string]any) + if stageData != nil { + v := reflect.ValueOf(stageData) + if v.Kind() != reflect.Map { + return nil, fmt.Errorf("could not validate input. Stage data is not a map. It is %s", v.Kind()) + } + + for _, reflectedKey := range v.MapKeys() { + if reflectedKey.Kind() != reflect.Interface { + return nil, fmt.Errorf("expected input key to be interface of a string. Got %s", reflectedKey.Kind()) + } + // Now convert interface to string + key, ok := reflectedKey.Interface().(string) + if !ok { + return nil, fmt.Errorf("error converting input key to string") + } + value := v.MapIndex(reflectedKey).Interface() + parsedInputs[key] = value + } + } + return parsedInputs, nil +} + +func (e *executor) preValidateCompatibility(rootSchema schema.Scope, inputField any, propertySchema *schema.PropertySchema, + workflowContext map[string][]byte) error { + // Get the type/value structure + inputTypeStructure, err := e.createTypeStructure(rootSchema, inputField, workflowContext) + if err != nil { + return err + } + // Now validate + return propertySchema.ValidateCompatibility(inputTypeStructure) +} + +// createTypeStructure generates a structure of all the type information of the input field. +// When the literal is known, it includes the original value. +// When the literal is not known, but the schema is, it includes the value. +// When it encounters a map or list, it preserves it and recursively continues. +func (e *executor) createTypeStructure(rootSchema schema.Scope, inputField any, workflowContext map[string][]byte) (any, error) { + + // Expression, so the exact value may not be known yet. So just get the type from it. + if expr, ok := inputField.(expressions.Expression); ok { + // Is expression, so evaluate it. + e.logger.Debugf("Evaluating expression %s...", expr.String()) + return expr.Type(rootSchema, workflowContext) + } + + v := reflect.ValueOf(inputField) + switch v.Kind() { + case reflect.Slice: + // Okay. Construct the list of schemas, and pass it into the + + result := make([]any, v.Len()) + for i := 0; i < v.Len(); i++ { + value := v.Index(i).Interface() + newValue, err := e.createTypeStructure(rootSchema, value, workflowContext) + if err != nil { + return nil, fmt.Errorf("failed to resolve expressions (%w)", err) + } + result[i] = newValue + } + return result, nil + case reflect.Map: + result := make(map[string]any, v.Len()) + for _, reflectedKey := range v.MapKeys() { + key := reflectedKey.Interface() + keyAsStr, ok := key.(string) + if !ok { + return nil, fmt.Errorf("failed to generate type structure. Key is not of type string") + } + value := v.MapIndex(reflectedKey).Interface() + newValue, err := e.createTypeStructure(rootSchema, value, workflowContext) + if err != nil { + return nil, fmt.Errorf("failed to resolve expressions (%w)", err) + } + result[keyAsStr] = newValue + } + return result, nil + default: + // Not an expression, so it's actually data. Just return the input + return inputField, nil + } +} + // buildInternalDataModel builds an internal data model that the expressions can query. func (e *executor) buildInternalDataModel(input schema.Scope, stepOutputProperties map[string]*schema.PropertySchema) *schema.ScopeSchema { internalDataModel := schema.NewScopeSchema( diff --git a/workflow/executor_test.go b/workflow/executor_test.go index cec272dd..80fae2c4 100644 --- a/workflow/executor_test.go +++ b/workflow/executor_test.go @@ -3,16 +3,76 @@ package workflow_test import ( "context" "fmt" + "go.arcalot.io/assert" + "go.flow.arcalot.io/deployer" + deployerregistry "go.flow.arcalot.io/deployer/registry" "go.flow.arcalot.io/engine/config" + "go.flow.arcalot.io/engine/internal/step" + "go.flow.arcalot.io/engine/internal/step/plugin" + testimpl "go.flow.arcalot.io/testdeployer" "testing" "go.arcalot.io/lang" "go.arcalot.io/log/v2" "go.flow.arcalot.io/engine/internal/step/dummy" - "go.flow.arcalot.io/engine/internal/step/registry" + stepregistry "go.flow.arcalot.io/engine/internal/step/registry" "go.flow.arcalot.io/engine/workflow" ) +func getTestImplPreparedWorkflow(t *testing.T, workflowDefinition string) (workflow.ExecutableWorkflow, error) { + logger := log.NewLogger(log.LevelDebug, log.NewTestWriter(t)) + cfg := &config.Config{ + LoggedOutputConfigs: map[string]*config.StepOutputLogConfig{ + "terminated_early": { + LogLevel: log.LevelError, + }, + }, + } + stepRegistry := NewTestImplStepRegistry(logger, t) + + executor := lang.Must2(workflow.NewExecutor( + logger, + cfg, + stepRegistry, + )) + wf := assert.NoErrorR[*workflow.Workflow](t)(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(workflowDefinition))) + return executor.Prepare(wf, map[string][]byte{}) +} + +func getDummyDeployerPreparedWorkflow(t *testing.T, workflowDefinition string) (workflow.ExecutableWorkflow, error) { + logger := log.NewLogger(log.LevelDebug, log.NewTestWriter(t)) + cfg := &config.Config{} + stepRegistry := assert.NoErrorR[step.Registry](t)(stepregistry.New( + dummy.New(), + )) + executor := assert.NoErrorR[workflow.Executor](t)(workflow.NewExecutor( + logger, + cfg, + stepRegistry, + )) + wf := assert.NoErrorR[*workflow.Workflow](t)(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(workflowDefinition))) + return executor.Prepare(wf, map[string][]byte{}) +} + +func NewTestImplStepRegistry( + logger log.Logger, + t *testing.T, +) step.Registry { + deployerRegistry := deployerregistry.New( + deployer.Any(testimpl.NewFactory()), + ) + + pluginProvider := assert.NoErrorR[step.Provider](t)( + plugin.New(logger, deployerRegistry, map[string]interface{}{ + "type": "test-impl", + "deploy_time": "0", + }), + ) + return assert.NoErrorR[step.Registry](t)(stepregistry.New( + pluginProvider, + )) +} + var sharedInputWorkflowYAML = `--- version: v0.1.0 input: @@ -27,6 +87,7 @@ input: steps: say_hi: kind: dummy + # Both name and nickname reference the same variable name: !expr $.input.name nickname: !expr $.input.name output: @@ -38,27 +99,9 @@ output: // and one step-output going into two step-outputs. // These cause duplicate connections to be made, which need to be handled properly. func TestSharedInput(t *testing.T) { - logger := log.NewLogger(log.LevelDebug, log.NewTestWriter(t)) - stepRegistry := lang.Must2(registry.New( - dummy.New(), - )) - - executor, err := workflow.NewExecutor(logger, &config.Config{}, stepRegistry) - if err != nil { - t.Fatalf("Failed to create Executor, %e", err) - } - - yamlConverter := workflow.NewYAMLConverter(stepRegistry) - decodedWorkflow, err := yamlConverter.FromYAML([]byte(sharedInputWorkflowYAML)) - if err != nil { - t.Fatalf("Failed to load workflow from YAML, %e", err) - } - - preparedWorkflow, err := executor.Prepare(decodedWorkflow, nil) - if err != nil { - t.Fatalf("Failed to prepare workflow, %e", err) - } - + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getDummyDeployerPreparedWorkflow(t, sharedInputWorkflowYAML), + ) ctx, cancel := context.WithCancel(context.Background()) defer cancel() outputID, outputData, err := preparedWorkflow.Execute(ctx, map[string]any{ @@ -70,3 +113,109 @@ func TestSharedInput(t *testing.T) { fmt.Printf("%s: %s\n", outputID, outputData.(map[any]any)["message"]) // Output: success: Hello Arca Lot! } + +var missingInputWorkflowDefinition1 = ` +version: v0.1.0 +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: {} +steps: + incomplete_wait: + plugin: "n/a" + step: wait + # Missing input +outputs: + a: + b: !expr $.steps.incomplete_wait.outputs +` + +var missingInputWorkflowDefinition2 = ` +version: v0.1.0 +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: {} +steps: + say_hi: + kind: dummy + # Missing name +outputs: + a: + b: !expr $.steps.say_hi.greet +` + +func TestMissingInput(t *testing.T) { + // For this test, a workflow's step will be missing its inputs. + _, err := getTestImplPreparedWorkflow(t, missingInputWorkflowDefinition1) + assert.Error(t, err) + + _, err = getDummyDeployerPreparedWorkflow(t, missingInputWorkflowDefinition2) + assert.Error(t, err) +} + +var mismatchedStepInputTypesWorkflowDefinition = ` +version: v0.1.0 +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: {} +steps: + wait_1: + plugin: "n/a" + step: wait + input: + wait_time_ms: 0 + wait_2: + plugin: "n/a" + step: wait + input: + # Should fail during preparation, due to message being a string, and wait_time_ms expecting an int + wait_time_ms: !expr $.steps.wait_1.outputs.success.message +outputs: + a: + b: !expr $.steps.wait_2.outputs +` + +func TestMismatchedStepInputTypes(t *testing.T) { + _, err := getTestImplPreparedWorkflow(t, mismatchedStepInputTypesWorkflowDefinition) + assert.Error(t, err) +} + +var mismatchedInputTypesWorkflowDefinition = ` +version: v0.1.0 +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: + a: + display: + description: "Just for testing" + name: "a" + required: true + type: + type_id: string +steps: + wait_1: + plugin: "n/a" + step: wait + input: + # This is trying to put a string into an int field + wait_time_ms: !expr $.input.a +outputs: + a: + b: !expr $.steps.wait_1.outputs +` + +func TestMismatchedInputTypes(t *testing.T) { + _, err := getTestImplPreparedWorkflow(t, mismatchedInputTypesWorkflowDefinition) + assert.Error(t, err) +} diff --git a/workflow/model.go b/workflow/model.go index f5ff010c..5ff2b02a 100644 --- a/workflow/model.go +++ b/workflow/model.go @@ -229,7 +229,7 @@ type ErrNoMorePossibleSteps struct { // Error returns an explanation on why the error happened. func (e ErrNoMorePossibleSteps) Error() string { - var outputs []string //nolint:prealloc + var outputsUnmetDependencies []string //nolint:prealloc for _, node := range e.dag.ListNodes() { if node.Item().Kind != DAGItemKindOutput { continue @@ -242,7 +242,24 @@ func (e ErrNoMorePossibleSteps) Error() string { for i := range inbound { unmetDependencies = append(unmetDependencies, i) } - outputs = append(outputs, fmt.Sprintf("%s: %s", node.Item().OutputID, strings.Join(unmetDependencies, ", "))) + outputsUnmetDependencies = append( + outputsUnmetDependencies, + fmt.Sprintf("%s: %s", node.Item().OutputID, strings.Join(unmetDependencies, ", ")), + ) } - return fmt.Sprintf("no steps running, no more executable steps, cannot construct any output (outputs have the following dependencies: %s)", strings.Join(outputs, "; ")) + return fmt.Sprintf( + "no steps running, no more executable steps, cannot construct any output (outputs have the following dependencies: %s)", + strings.Join(outputsUnmetDependencies, "; "), + ) +} + +// ErrInvalidState indicates that the workflow failed due to an invalid state. +type ErrInvalidState struct { + processingSteps int + msg string +} + +// Error returns an explanation on why the error happened. +func (e ErrInvalidState) Error() string { + return fmt.Sprintf("Workflow failed due to invalid state (%s). Processing steps: %d", e.msg, e.processingSteps) } diff --git a/workflow/workflow.go b/workflow/workflow.go index 044f9cb2..9ffab54d 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -6,6 +6,7 @@ import ( "fmt" "reflect" "sync" + "time" "go.flow.arcalot.io/engine/config" @@ -59,7 +60,7 @@ func (e *executableWorkflow) Execute(ctx context.Context, input any) (outputID s defer cancel() l := &loopState{ - logger: e.logger, + logger: e.logger.WithLabel("source", "workflow"), config: e.config, lock: &sync.Mutex{}, data: map[string]any{ @@ -73,8 +74,11 @@ func (e *executableWorkflow) Execute(ctx context.Context, input any) (outputID s outputDone: false, cancel: cancel, workflowContext: e.workflowContext, + recentErrors: make(chan error, 20), // Big buffer in case there are lots of subsequent errors. } + l.lock.Lock() + // Iterate over all steps to set them up with proper handlers, then launch them. // Even though they're launched, the workflow won't execute until the input is provided. for stepID, runnableStep := range e.runnableSteps { @@ -92,36 +96,52 @@ func (e *executableWorkflow) Execute(ctx context.Context, input any) (outputID s l.data["steps"].(map[string]any)[stepID] = stepDataModel var stageHandler step.StageChangeHandler = &stageChangeHandler{ - onStageChange: func(step step.RunningStep, previousStage *string, previousStageOutputID *string, previousStageOutput *any, stage string, waitingForInput bool) { + onStageChange: func( + step step.RunningStep, + previousStage *string, + previousStageOutputID *string, + previousStageOutput *any, + stage string, + inputAvailable bool, + wg *sync.WaitGroup, + ) { waitingForInputText := "" - if waitingForInput { + if !inputAvailable { waitingForInputText = " and is waiting for input" } - e.logger.Debugf("Stage change for step %s to %s%s...", stepID, stage, waitingForInputText) - l.onStageComplete(stepID, previousStage, previousStageOutputID, previousStageOutput) + e.logger.Debugf("START Stage change for step %s to %s%s...", stepID, stage, waitingForInputText) + l.onStageComplete(stepID, previousStage, previousStageOutputID, previousStageOutput, wg) + e.logger.Debugf("DONE Stage change for step %s to %s%s...", stepID, stage, waitingForInputText) }, - onStepComplete: func(step step.RunningStep, previousStage string, previousStageOutputID *string, previousStageOutput *any) { + onStepComplete: func( + step step.RunningStep, + previousStage string, + previousStageOutputID *string, + previousStageOutput *any, + wg *sync.WaitGroup, + ) { if previousStageOutputID != nil { e.logger.Debugf("Step %s completed with stage '%s', output '%s'...", stepID, previousStage, *previousStageOutputID) } else { e.logger.Debugf("Step %s completed with stage '%s'...", stepID, previousStage) } - l.onStageComplete(stepID, &previousStage, previousStageOutputID, previousStageOutput) + l.onStageComplete(stepID, &previousStage, previousStageOutputID, previousStageOutput, wg) }, } e.logger.Debugf("Launching step %s...", stepID) - runningStep, err := runnableStep.Start(e.stepRunData[stepID], stageHandler) + runningStep, err := runnableStep.Start(e.stepRunData[stepID], stepID, stageHandler) if err != nil { return "", nil, fmt.Errorf("failed to launch step %s (%w)", stepID, err) } l.runningSteps[stepID] = runningStep } + l.lock.Unlock() // Let's make sure we are closing all steps once this function terminates so we don't leave stuff running. defer func() { e.logger.Debugf("Terminating all steps...") for stepID, runningStep := range l.runningSteps { e.logger.Debugf("Terminating step %s...", stepID) - if err := runningStep.Close(); err != nil { + if err := runningStep.ForceClose(); err != nil { panic(fmt.Errorf("failed to close step %s (%w)", stepID, err)) } } @@ -171,9 +191,10 @@ func (e *executableWorkflow) Execute(ctx context.Context, input any) (outputID s } return outputDataEntry.outputID, outputData, nil case <-ctx.Done(): - e.logger.Debugf("Workflow execution aborted. %s", l.lastError) - if l.lastError != nil { - return "", nil, l.lastError + lastErr := l.getLastError() + e.logger.Debugf("Workflow execution aborted. %s", lastErr) + if lastErr != nil { + return "", nil, lastErr } return "", nil, fmt.Errorf("workflow execution aborted (%w)", ctx.Err()) } @@ -197,14 +218,42 @@ type loopState struct { runningSteps map[string]step.RunningStep outputDataChannel chan outputDataType outputDone bool - lastError error + recentErrors chan error cancel context.CancelFunc workflowContext map[string][]byte } -func (l *loopState) onStageComplete(stepID string, previousStage *string, previousStageOutputID *string, previousStageOutput *any) { +// getLastError gathers the last errors. If there are several, it creates a new one that consolidates them. +// This will read from the channel. Calling again will only gather new errors since the last call. +func (l *loopState) getLastError() error { + var errors []error +errGatherLoop: + for { + select { + case err := <-l.recentErrors: + errors = append(errors, err) + default: + break errGatherLoop // No more errors + } + } + switch len(errors) { + case 0: + return nil + case 1: + return errors[0] + default: + return fmt.Errorf("multiple errors: %v", errors) + } +} + +func (l *loopState) onStageComplete(stepID string, previousStage *string, previousStageOutputID *string, previousStageOutput *any, wg *sync.WaitGroup) { l.lock.Lock() - defer l.lock.Unlock() + defer func() { + if previousStage != nil { + l.checkForDeadlocks(3, wg) + } + l.lock.Unlock() + }() if previousStage == nil { return @@ -212,13 +261,14 @@ func (l *loopState) onStageComplete(stepID string, previousStage *string, previo stageNode, err := l.dag.GetNodeByID(GetStageNodeID(stepID, *previousStage)) if err != nil { l.logger.Errorf("Failed to get stage node ID %s (%w)", GetStageNodeID(stepID, *previousStage), err) - l.lastError = fmt.Errorf("failed to get stage node ID %s (%w)", GetStageNodeID(stepID, *previousStage), err) + l.recentErrors <- fmt.Errorf("failed to get stage node ID %s (%w)", GetStageNodeID(stepID, *previousStage), err) l.cancel() return } + l.logger.Debugf("Removed node '%s' from the DAG", stageNode.ID()) if err := stageNode.Remove(); err != nil { l.logger.Errorf("Failed to remove stage node ID %s (%w)", stageNode.ID(), err) - l.lastError = fmt.Errorf("failed to remove stage node ID %s (%w)", stageNode.ID(), err) + l.recentErrors <- fmt.Errorf("failed to remove stage node ID %s (%w)", stageNode.ID(), err) l.cancel() return } @@ -226,13 +276,15 @@ func (l *loopState) onStageComplete(stepID string, previousStage *string, previo outputNode, err := l.dag.GetNodeByID(GetOutputNodeID(stepID, *previousStage, *previousStageOutputID)) if err != nil { l.logger.Errorf("Failed to get output node ID %s (%w)", GetStageNodeID(stepID, *previousStage), err) - l.lastError = fmt.Errorf("failed to get output node ID %s (%w)", GetStageNodeID(stepID, *previousStage), err) + l.recentErrors <- fmt.Errorf("failed to get output node ID %s (%w)", GetStageNodeID(stepID, *previousStage), err) l.cancel() return } + // Removes the node from the DAG. This results in the nodes not having inbound connections, allowing them to be processed. + l.logger.Debugf("Removed node '%s' from the DAG", outputNode.ID()) if err := outputNode.Remove(); err != nil { l.logger.Errorf("Failed to remove output node ID %s (%w)", outputNode.ID(), err) - l.lastError = fmt.Errorf("failed to remove output node ID %s (%w)", outputNode.ID(), err) + l.recentErrors <- fmt.Errorf("failed to remove output node ID %s (%w)", outputNode.ID(), err) l.cancel() return } @@ -257,7 +309,7 @@ func (l *loopState) onStageComplete(stepID string, previousStage *string, previo // notifySteps is a function we can call to go through all DAG nodes that have no inbound connections and // provide step inputs based on expressions. -// The lock should be acquired when this is called. +// The lock should be acquired by the caller before this is called. func (l *loopState) notifySteps() { //nolint:gocognit // This function goes through the DAG and feeds the input to all steps that have no further inbound // dependencies. @@ -302,33 +354,33 @@ func (l *loopState) notifySteps() { //nolint:gocognit // Tries to match the schema if _, err := node.Item().DataSchema.Unserialize(untypedInputData); err != nil { l.logger.Errorf("Bug: schema evaluation resulted in invalid data for %s (%v)", node.ID(), err) - l.lastError = fmt.Errorf("bug: schema evaluation resulted in invalid data for %s (%w)", node.ID(), err) + l.recentErrors <- fmt.Errorf("bug: schema evaluation resulted in invalid data for %s (%w)", node.ID(), err) l.cancel() return } // This check is here just to make sure it has the required fields set - if node.Item().StepID != "" && node.Item().StageID != "" { - stageInputData := untypedInputData.(map[any]any) - typedInputData := make(map[string]any, len(stageInputData)) - for k, v := range stageInputData { - typedInputData[k.(string)] = v - } - // Sends it to the plugin - l.logger.Debugf("Providing stage input for %s...", nodeID) - if err := l.runningSteps[node.Item().StepID].ProvideStageInput( - node.Item().StageID, - typedInputData, - ); err != nil { - l.logger.Errorf("Bug: failed to provide input to step %s (%w)", node.Item().StepID, err) - l.lastError = fmt.Errorf("bug: failed to provide input to step %s (%w)", node.Item().StepID, err) - l.cancel() - return - } - } else { + if node.Item().StepID == "" || node.Item().StageID == "" { // This shouldn't happen panic("Step or stage ID missing") } + + stageInputData := untypedInputData.(map[any]any) + typedInputData := make(map[string]any, len(stageInputData)) + for k, v := range stageInputData { + typedInputData[k.(string)] = v + } + // Sends it to the plugin + l.logger.Debugf("Providing stage input for %s...", nodeID) + if err := l.runningSteps[node.Item().StepID].ProvideStageInput( + node.Item().StageID, + typedInputData, + ); err != nil { + l.logger.Errorf("Bug: failed to provide input to step %s (%w)", node.Item().StepID, err) + l.recentErrors <- fmt.Errorf("bug: failed to provide input to step %s (%w)", node.Item().StepID, err) + l.cancel() + return + } case DAGItemKindOutput: // We have received enough data to construct the workflow output. l.logger.Debugf("Constructing workflow output.") @@ -354,17 +406,29 @@ func (l *loopState) notifySteps() { //nolint:gocognit } } } - // Here we make sure we don't have a deadlock. +} + +type stateCounters struct { + starting int + waitingWithInbound int + waitingWithoutInbound int + running int + finished int +} + +func (l *loopState) countStates() stateCounters { counters := struct { - starting int - waiting int - running int - finished int + starting int + waitingWithInbound int + waitingWithoutInbound int + running int + finished int }{ 0, 0, 0, 0, + 0, } for stepID, runningStep := range l.runningSteps { switch runningStep.State() { @@ -372,20 +436,25 @@ func (l *loopState) notifySteps() { //nolint:gocognit counters.starting++ l.logger.Debugf("Step %s is currently starting.", stepID) case step.RunningStepStateWaitingForInput: - counters.waiting++ - connectionsMsg := "" dagNode, err := l.dag.GetNodeByID(GetStageNodeID(stepID, runningStep.CurrentStage())) switch { case err != nil: l.logger.Warningf("Failed to get DAG node for the debug message (%w)", err) + counters.waitingWithInbound++ case dagNode == nil: l.logger.Warningf("Failed to get DAG node for the debug message. Returned nil", err) + counters.waitingWithInbound++ default: inboundConnections, err := dagNode.ListInboundConnections() if err != nil { l.logger.Warningf("Error while listing inbound connections. (%w)", err) } + if len(inboundConnections) > 0 { + counters.waitingWithInbound++ + } else { + counters.waitingWithoutInbound++ + } i := 0 for k := range inboundConnections { @@ -405,20 +474,38 @@ func (l *loopState) notifySteps() { //nolint:gocognit l.logger.Debugf("Step %s is currently finished.", stepID) } } + return counters +} + +func (l *loopState) checkForDeadlocks(retries int, wg *sync.WaitGroup) { + // Here we make sure we don't have a deadlock. + counters := l.countStates() l.logger.Infof( - "There are currently %d steps starting, %d waiting for input, %d running, %d finished", + "There are currently %d steps starting, %d waiting for input, %d ready for input, %d running, %d finished", counters.starting, - counters.waiting, + counters.waitingWithInbound, + counters.waitingWithoutInbound, counters.running, counters.finished, ) - if counters.starting == 0 && counters.running == 0 && !l.outputDone { - l.lastError = &ErrNoMorePossibleSteps{ - l.dag, + if counters.starting == 0 && counters.running == 0 && counters.waitingWithoutInbound == 0 && !l.outputDone { + if retries <= 0 { + l.recentErrors <- &ErrNoMorePossibleSteps{ + l.dag, + } + l.logger.Debugf("DAG:\n%s", l.dag.Mermaid()) + l.cancel() + } else { + // Retry. There are times when all the steps are in a transition state. + // Retrying will delay the check until after they are done with the transition. + l.logger.Warningf("No running steps. Rechecking...") + wg.Add(1) + go func() { + time.Sleep(5 * time.Millisecond) + l.checkForDeadlocks(retries-1, wg) + wg.Done() + }() } - l.logger.Errorf("%v", l.lastError) - l.logger.Debugf("DAG:\n%s", l.dag.Mermaid()) - l.cancel() } } @@ -459,15 +546,44 @@ func (l *loopState) resolveExpressions(inputData any, dataModel any) (any, error } } +// stageChangeHandler is implementing step.StageChangeHandler. type stageChangeHandler struct { - onStageChange func(step step.RunningStep, previousStage *string, previousStageOutputID *string, previousStageOutput *any, stage string, waitingForInput bool) - onStepComplete func(step step.RunningStep, previousStage string, previousStageOutputID *string, previousStageOutput *any) + onStageChange func( + step step.RunningStep, + previousStage *string, + previousStageOutputID *string, + previousStageOutput *any, + stage string, + waitingForInput bool, + wg *sync.WaitGroup, + ) + onStepComplete func( + step step.RunningStep, + previousStage string, + previousStageOutputID *string, + previousStageOutput *any, + wg *sync.WaitGroup, + ) } -func (s stageChangeHandler) OnStageChange(step step.RunningStep, previousStage *string, previousStageOutputID *string, previousStageOutput *any, stage string, waitingForInput bool) { - s.onStageChange(step, previousStage, previousStageOutputID, previousStageOutput, stage, waitingForInput) +func (s stageChangeHandler) OnStageChange( + step step.RunningStep, + previousStage *string, + previousStageOutputID *string, + previousStageOutput *any, + stage string, + waitingForInput bool, + wg *sync.WaitGroup, +) { + s.onStageChange(step, previousStage, previousStageOutputID, previousStageOutput, stage, waitingForInput, wg) } -func (s stageChangeHandler) OnStepComplete(step step.RunningStep, previousStage string, previousStageOutputID *string, previousStageOutput *any) { - s.onStepComplete(step, previousStage, previousStageOutputID, previousStageOutput) +func (s stageChangeHandler) OnStepComplete( + step step.RunningStep, + previousStage string, + previousStageOutputID *string, + previousStageOutput *any, + wg *sync.WaitGroup, +) { + s.onStepComplete(step, previousStage, previousStageOutputID, previousStageOutput, wg) } diff --git a/workflow/workflow_test.go b/workflow/workflow_test.go index 7faff9de..f7f56254 100644 --- a/workflow/workflow_test.go +++ b/workflow/workflow_test.go @@ -3,21 +3,13 @@ package workflow_test import ( "context" "errors" - "testing" - "time" - - "go.flow.arcalot.io/deployer" - "go.flow.arcalot.io/engine/internal/step" - "go.flow.arcalot.io/engine/internal/step/plugin" - testimpl "go.flow.arcalot.io/testdeployer" - "go.arcalot.io/assert" "go.arcalot.io/lang" "go.arcalot.io/log/v2" - deployerregistry "go.flow.arcalot.io/deployer/registry" "go.flow.arcalot.io/engine/config" - "go.flow.arcalot.io/engine/internal/step/dummy" - stepregistry "go.flow.arcalot.io/engine/internal/step/registry" + "testing" + "time" + "go.flow.arcalot.io/engine/workflow" ) @@ -41,26 +33,9 @@ output: ` func TestOutputFailed(t *testing.T) { - logConfig := log.Config{ - Level: log.LevelError, - Destination: log.DestinationStdout, - } - logger := log.New( - logConfig, + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getDummyDeployerPreparedWorkflow(t, badWorkflowDefinition), ) - cfg := &config.Config{ - Log: logConfig, - } - stepRegistry := lang.Must2(stepregistry.New( - dummy.New(), - )) - executor := lang.Must2(workflow.NewExecutor( - logger, - cfg, - stepRegistry, - )) - wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(badWorkflowDefinition))) - preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{})) _, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{"name": "Arca Lot"}) assert.Nil(t, outputData) assert.Error(t, err) @@ -89,64 +64,186 @@ steps: plugin: "n/a" step: wait input: - wait_time_ms: 0 + # It needs to be long enough for it to ensure that long_wait is in a running state. + # The other case will be tested separately. + wait_time_ms: 20 outputs: a: cancelled_step_output: !expr $.steps.long_wait.outputs ` -func NewTestImplStepRegistry( - logger log.Logger, - t *testing.T, -) step.Registry { - deployerRegistry := deployerregistry.New( - deployer.Any(testimpl.NewFactory()), +func TestStepCancellation(t *testing.T) { + // For this test, a simple workflow will run wait steps, with one that's + // supposed to be stopped when the first stops. + // The long one will be long enough that there is no reasonable way + // for it to finish before the first step. + // The test double deployer will be used for this test, as we + // need a deployer to test the plugin step provider. + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getTestImplPreparedWorkflow(t, stepCancellationWorkflowDefinition), ) + outputID, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) + assert.NoError(t, err) + assert.Equals(t, outputID, "a") + stepResult := assert.MapContainsKeyAny(t, "cancelled_step_output", outputData.(map[any]any)) + assert.MapContainsKey(t, "cancelled_early", stepResult.(map[string]any)) +} + +var earlyStepCancellationWorkflowDefinition = ` +version: v0.1.0 +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: {} +steps: + # This one needs to run longer than the total time expected of all the other steps, with + # a large enough difference to prevent timing errors breaking the test. + end_wait: + plugin: "n/a" + step: wait + input: + wait_time_ms: 80 + # Delay needs to be delayed long enough to ensure that last_step isn't running when it's cancelled by short_wait + delay: + plugin: "n/a" + step: wait + input: + wait_time_ms: 50 + last_step: + plugin: "n/a" + step: wait + input: + wait_time_ms: 0 + # Delay it so it doesn't run, and gets cancelled before deployment. + wait_for: !expr $.steps.delay.outputs + # You can verify that this test works by commenting out this line. It should fail. + stop_if: !expr $.steps.short_wait.outputs + short_wait: + plugin: "n/a" + step: wait + input: + # End the test quickly. + wait_time_ms: 0 +outputs: + # If not properly cancelled, fail_case will have output. + fail_case: + unattainable: !expr $.steps.last_step.outputs + correct_case: + a: !expr $.steps.end_wait.outputs +` - pluginProvider := assert.NoErrorR[step.Provider](t)( - plugin.New(logger, deployerRegistry, map[string]interface{}{ - "type": "test-impl", - "deploy_time": "0", - }), +func TestEarlyStepCancellation(t *testing.T) { + // For this test, a simple workflow will run wait steps, with the workflow + // The long one will be long enough that there is no reasonable way + // for it to finish before the first step. + // The test double deployer will be used for this test, as we + // need a deployer to test the plugin step provider. + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getTestImplPreparedWorkflow(t, earlyStepCancellationWorkflowDefinition), ) - return assert.NoErrorR[step.Registry](t)(stepregistry.New( - pluginProvider, - )) + startTime := time.Now() // Right before execute to not include pre-processing time. + outputID, _, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) + duration := time.Since(startTime) + t.Logf("Test execution time: %s", duration) + // A nil value means the output could not be constructed, which is intended due to us cancelling the step it depends on. + // If it's not nil, that means the step didn't get cancelled. + assert.NoError(t, err) + assert.Equals(t, outputID, "correct_case") + // All steps that can result in output are 0 ms, so just leave some time for processing. + assert.LessThan(t, duration.Milliseconds(), 200) } -func TestStepCancellation(t *testing.T) { - // For this test, a simple workflow will run wait steps, with one that's - // supposed to be stopped when the first stops. +var deploymentStepCancellationWorkflowDefinition = ` +version: v0.1.0 +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: {} +steps: + # This one needs to run longer than the total time expected of all the other steps, with + # a large enough difference to prevent timing errors breaking the test. + end_wait: + plugin: "n/a" + step: wait + input: + wait_time_ms: 100 + step_to_cancel: + plugin: "n/a" + step: wait + input: + wait_time_ms: 0 + # You can verify that this test works by commenting out this line. It should fail. + stop_if: !expr $.steps.short_wait.outputs + # Delay needs to be delayed long enough to ensure that it's in a deploy state when it's cancelled by short_wait + deploy: + type: "test-impl" + deploy_time: 50 # 50 ms + short_wait: + plugin: "n/a" + step: wait + input: + # End the test quickly. + wait_time_ms: 0 +outputs: + # If not properly cancelled, fail_case will have output. + fail_case: + unattainable: !expr $.steps.step_to_cancel.outputs + correct_case: + a: !expr $.steps.end_wait.outputs +` + +func TestDeploymentStepCancellation(t *testing.T) { + // For this test, a simple workflow will run wait steps, with the workflow // The long one will be long enough that there is no reasonable way // for it to finish before the first step. // The test double deployer will be used for this test, as we // need a deployer to test the plugin step provider. - logConfig := log.Config{ - Level: log.LevelInfo, - Destination: log.DestinationStdout, - } - logger := log.New( - logConfig, + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getTestImplPreparedWorkflow(t, deploymentStepCancellationWorkflowDefinition), ) - cfg := &config.Config{ - Log: logConfig, - } - stepRegistry := NewTestImplStepRegistry(logger, t) + startTime := time.Now() // Right before execute to not include pre-processing time. + outputID, _, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) + duration := time.Since(startTime) + t.Logf("Test execution time: %s", duration) + // A nil value means the output could not be constructed, which is intended due to us cancelling the step it depends on. + // If it's not nil, that means the step didn't get cancelled. + assert.NoError(t, err) + assert.Equals(t, outputID, "correct_case") + // All steps that can result in output are 0 ms, so just leave some time for processing. + assert.LessThan(t, duration.Milliseconds(), 200) +} - executor := lang.Must2(workflow.NewExecutor( - logger, - cfg, - stepRegistry, - )) - wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(stepCancellationWorkflowDefinition))) - preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{})) - outputID, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) +var simpleValidLiteralInputWaitWorkflowDefinition = ` +version: v0.1.0 +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: {} +steps: + wait_1: + plugin: "n/a" + step: wait + input: + wait_time_ms: 0 +outputs: + a: + b: !expr $.steps.wait_1.outputs +` + +func TestSimpleValidWaitWorkflow(t *testing.T) { + // Just a single wait + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getTestImplPreparedWorkflow(t, simpleValidLiteralInputWaitWorkflowDefinition), + ) + outputID, _, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) assert.NoError(t, err) assert.Equals(t, outputID, "a") - stepResult := outputData.(map[interface{}]interface{})["cancelled_step_output"] - assert.NotNil(t, stepResult) - stepResultCancelledEarly := stepResult.(map[string]interface{})["cancelled_early"] - assert.NotNil(t, stepResultCancelledEarly) } var waitForSerialWorkflowDefinition = ` @@ -162,12 +259,13 @@ steps: plugin: "n/a" step: wait input: - wait_time_ms: 500 + # Note: 5ms left only a 2.5ms margin for error. 10ms left almost 6ms. So 10ms min is recommended. + wait_time_ms: 10 second_wait: plugin: "n/a" step: wait input: - wait_time_ms: 500 + wait_time_ms: 10 wait_for: !expr $.steps.first_wait.outputs.success outputs: success: @@ -183,7 +281,6 @@ func TestWaitForSerial(t *testing.T) { // as each step runs for 5s and are run serially // The test double deployer will be used for this test, as we // need a deployer to test the plugin step provider. - startTime := time.Now() logConfig := log.Config{ Level: log.LevelInfo, Destination: log.DestinationStdout, @@ -203,6 +300,7 @@ func TestWaitForSerial(t *testing.T) { )) wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(waitForSerialWorkflowDefinition))) preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{})) + startTime := time.Now() // Right before execute to not include pre-processing time. outputID, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) assert.NoError(t, err) assert.Equals(t, outputID, "success") @@ -217,20 +315,14 @@ func TestWaitForSerial(t *testing.T) { duration := time.Since(startTime) t.Logf("Test execution time: %s", duration) - var waitSuccess bool - if duration >= 1*time.Second { - waitSuccess = true - t.Logf("Test execution time is greater than 1 second, steps are running serially due to the wait_for condition.") + if duration >= 20*time.Millisecond { + t.Logf("Test execution time is greater than 20 milliseconds; steps are correctly running serially due to the wait_for condition.") } else { - waitSuccess = false - t.Logf("Test execution time is lesser than 1 seconds, steps are not running serially.") + t.Fatalf("Test execution time is less than 20 milliseconds; steps are not running serially.") } - assert.Equals(t, waitSuccess, true) } -// Running parallel steps which wait on the same previous step sometimes causes a race condition. This needs to be investigated. -// once the race condition if fixed reduce the wait_time to 500ms. -var waitForParallelWorkflowDefinition = ` +var missingInputsFailedDeploymentWorkflowDefinition = ` version: v0.1.0 input: root: RootObject @@ -239,74 +331,68 @@ input: id: RootObject properties: {} steps: - first_wait: + wait_1: plugin: "n/a" step: wait input: - wait_time_ms: 5000 - second_wait: + wait_time_ms: 0 + deploy: + type: "test-impl" + #deploy_time: 20000 # 10 ms + deploy_succeed: false + wait_2: plugin: "n/a" step: wait + wait_for: !expr $.steps.wait_1.outputs.success input: - wait_time_ms: 5000 - wait_for: !expr $.steps.first_wait.outputs.success - third_wait: + wait_time_ms: 0 +outputs: + a: + b: !expr $.steps.wait_2.outputs +` + +func TestMissingInputsFailedDeployment(t *testing.T) { + // For this test, the workflow should fail, not deadlock, due to no inputs possible. + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getTestImplPreparedWorkflow(t, missingInputsFailedDeploymentWorkflowDefinition), + ) + outputID, _, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) + assert.Error(t, err) + assert.Equals(t, outputID, "") +} + +var missingInputsWrongOutputWorkflowDefinition = ` +version: v0.1.0 +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: {} +steps: + wait_1: plugin: "n/a" step: wait input: - wait_time_ms: 5000 - wait_for: !expr $.steps.first_wait.outputs.success + wait_time_ms: 0 + wait_2: + plugin: "n/a" + step: wait + # No stop_if, so this shouldn't happen. + wait_for: !expr $.steps.wait_1.outputs.cancelled_early + input: + wait_time_ms: 0 outputs: - success: - third_step_output: !expr $.steps.third_wait.outputs - second_step_output: !expr $.steps.second_wait.outputs + a: + b: !expr $.steps.wait_2.outputs ` -func TestWaitForParallel(t *testing.T) { - // For this test, a workflow runs three steps, where each step runs a wait step for 5s - // The second and third wait steps wait for the first to succeed after which they both run in parallel - // The total execution time for this test function should be greater than 5s but lesser than 15s - // as the first step runs for 5s and other two steps run in parallel after the first succeeds - // The test double deployer will be used for this test, as we - // need a deployer to test the plugin step provider. - startTime := time.Now() - logConfig := log.Config{ - Level: log.LevelInfo, - Destination: log.DestinationStdout, - } - logger := log.New( - logConfig, +func TestMissingInputsWrongOutput(t *testing.T) { + // For this test, the workflow should fail, not deadlock, due to no inputs possible. + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getTestImplPreparedWorkflow(t, missingInputsWrongOutputWorkflowDefinition), ) - cfg := &config.Config{ - Log: logConfig, - } - stepRegistry := NewTestImplStepRegistry(logger, t) - - executor := lang.Must2(workflow.NewExecutor( - logger, - cfg, - stepRegistry, - )) - wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(waitForParallelWorkflowDefinition))) - preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{})) - outputID, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) - assert.NoError(t, err) - assert.Equals(t, outputID, "success") - stepResult2 := outputData.(map[interface{}]interface{})["second_step_output"] - assert.NotNil(t, stepResult2) - stepResult3 := outputData.(map[interface{}]interface{})["third_step_output"] - assert.NotNil(t, stepResult3) - t.Log(stepResult3) - - duration := time.Since(startTime) - t.Logf("Test execution time: %s", duration) - var waitSuccess bool - if duration > 10*time.Second && duration < 20*time.Second { - waitSuccess = true - t.Logf("Steps second_wait and third_wait are running in parallel after waiting for the first_wait step.") - } else { - waitSuccess = false - t.Logf("Steps second_wait and third_wait are not running in parallel.") - } - assert.Equals(t, waitSuccess, true) + outputID, _, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) + assert.Error(t, err) + assert.Equals(t, outputID, "") }