diff --git a/cmd/arcaflow/main.go b/cmd/arcaflow/main.go index 83cea9af..967ebf90 100644 --- a/cmd/arcaflow/main.go +++ b/cmd/arcaflow/main.go @@ -133,7 +133,7 @@ Options: } cfg.Log.Stdout = os.Stderr - logger := log.New(cfg.Log) + logger := log.New(cfg.Log).WithLabel("source", "main") dirContext, err := loadContext(dir) if err != nil { diff --git a/cmd/run-plugin/run.go b/cmd/run-plugin/run.go index be241dee..f3a19138 100644 --- a/cmd/run-plugin/run.go +++ b/cmd/run-plugin/run.go @@ -6,7 +6,10 @@ import ( "flag" "fmt" "go.flow.arcalot.io/deployer" + "go.flow.arcalot.io/pluginsdk/plugin" + "go.flow.arcalot.io/pluginsdk/schema" podman "go.flow.arcalot.io/podmandeployer" + pythondeployer "go.flow.arcalot.io/pythondeployer" "os" "os/signal" @@ -21,14 +24,19 @@ func main() { var image string var file string var stepID string + var pythonPath string var d deployer.AnyConnectorFactory var defaultConfig any var deployerID = "docker" + var runID = "run" + var workingDir string flag.StringVar(&image, "image", image, "Docker image to run") flag.StringVar(&file, "file", file, "Input file") flag.StringVar(&stepID, "step", stepID, "Step name") flag.StringVar(&deployerID, "deployer", stepID, "The name of the deployer") + flag.StringVar(&pythonPath, "pythonpath", "", "Path to the Python environment") + flag.StringVar(&workingDir, "workingdir", "~/", "Path to store cloned repositories") flag.Parse() switch deployerID { @@ -60,35 +68,56 @@ func main() { if err != nil { panic(err) } + case "python": + pythonFactory := pythondeployer.NewFactory() + d = deployer.Any(pythonFactory) + configSchema := pythonFactory.ConfigurationSchema() + var err error + configInput := map[string]any{} + configInput["pythonPath"] = pythonPath + configInput["workdir"] = workingDir + defaultConfig, err = configSchema.UnserializeType(configInput) + if err != nil { + panic(err) + } default: - panic("No deployer or invalid deployer selected. Options: docker, podman, testimpl. Select with -deployer") + panic("No deployer or invalid deployer selected. Options: docker, podman, testimpl, python. Select with -deployer") } - - connector, err := d.Create(defaultConfig, log.New(log.Config{ + logger := log.New(log.Config{ Level: log.LevelDebug, Destination: log.DestinationStdout, - })) + }) + connector, err := d.Create(defaultConfig, logger) if err != nil { panic(err) } ctrlC := make(chan os.Signal, 1) signal.Notify(ctrlC, os.Interrupt) + // Set up the signal channel to send cancel signal on ctrl-c + toStepSignals := make(chan schema.Input, 3) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { select { case <-ctrlC: - fmt.Println("Received CTRL-C. Cancelling the context to cancel the step...") + logger.Infof("Received CTRL-C. Sending cancel signal...") + toStepSignals <- schema.Input{ + RunID: runID, + ID: plugin.CancellationSignalSchema.ID(), + InputData: make(map[string]any), + } + logger.Infof("Signal request sent to ATP client.") cancel() case <-ctx.Done(): // Done here. } }() - fmt.Println("Deploying") + logger.Infof("Deploying") plugin, err := connector.Deploy(ctx, image) if err != nil { + logger.Errorf("Error while deploying: %s", err) panic(err) } defer func() { @@ -97,8 +126,8 @@ func main() { } }() - atpClient := atp.NewClient(plugin) - fmt.Println("Getting schema") + atpClient := atp.NewClientWithLogger(plugin, logger) + logger.Infof("Getting schema") pluginSchema, err := atpClient.ReadSchema() if err != nil { panic(err) @@ -120,15 +149,22 @@ func main() { panic(err) } fmt.Printf("Running step %s\n", stepID) - outputID, outputData, err := atpClient.Execute(stepID, input) + result := atpClient.Execute( + schema.Input{RunID: runID, ID: stepID, InputData: input}, + toStepSignals, + nil, + ) + if err := atpClient.Close(); err != nil { + fmt.Printf("Error closing ATP client: %s", err) + } output := map[string]any{ - "outputID": outputID, - "outputData": outputData, - "err": err, + "outputID": result.OutputID, + "outputData": result.OutputData, + "err": result.Error, } - result, err := yaml.Marshal(output) + resultStr, err := yaml.Marshal(output) if err != nil { panic(err) } - fmt.Printf("%s", result) + fmt.Printf("%s", resultStr) } diff --git a/engine_test.go b/engine_test.go index bd5a45bf..3ff7cc5b 100644 --- a/engine_test.go +++ b/engine_test.go @@ -143,7 +143,7 @@ input: type_id: string steps: example: - plugin: quay.io/arcalot/arcaflow-plugin-template-python:0.1.0 + plugin: quay.io/arcalot/arcaflow-plugin-template-python:0.2.1 input: name: !expr $.input.name output: @@ -174,7 +174,7 @@ input: type_id: string steps: example: - plugin: quay.io/arcalot/arcaflow-plugin-template-python:0.1.0 + plugin: quay.io/arcalot/arcaflow-plugin-template-python:0.2.1 input: name: !expr $.input.name outputs: diff --git a/go.mod b/go.mod index 97cb47c1..de8d6e7d 100644 --- a/go.mod +++ b/go.mod @@ -3,18 +3,18 @@ module go.flow.arcalot.io/engine go 1.18 require ( - go.arcalot.io/assert v1.3.0 + go.arcalot.io/assert v1.6.0 go.arcalot.io/dgraph v1.1.0 go.arcalot.io/lang v1.0.0 go.arcalot.io/log/v2 v2.0.0 - go.flow.arcalot.io/deployer v0.2.0 - go.flow.arcalot.io/dockerdeployer v0.3.0 - go.flow.arcalot.io/expressions v0.2.0 - go.flow.arcalot.io/kubernetesdeployer v0.5.1 - go.flow.arcalot.io/pluginsdk v0.3.0-beta.1 - go.flow.arcalot.io/podmandeployer v0.3.1 - go.flow.arcalot.io/pythondeployer v0.1.3 - go.flow.arcalot.io/testdeployer v0.2.0 + go.flow.arcalot.io/deployer v0.3.0 + go.flow.arcalot.io/dockerdeployer v0.4.0 + go.flow.arcalot.io/expressions v0.2.1 + go.flow.arcalot.io/kubernetesdeployer v0.7.0 + go.flow.arcalot.io/pluginsdk v0.5.0 + go.flow.arcalot.io/podmandeployer v0.5.0 + go.flow.arcalot.io/pythondeployer v0.3.0 + go.flow.arcalot.io/testdeployer v0.3.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -22,12 +22,13 @@ require ( github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/docker/distribution v2.8.2+incompatible // indirect - github.com/docker/docker v24.0.5+incompatible // indirect + github.com/distribution/reference v0.5.0 // indirect + github.com/docker/distribution v2.8.3+incompatible // indirect + github.com/docker/docker v24.0.6+incompatible // indirect github.com/docker/go-connections v0.4.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/emicklei/go-restful/v3 v3.10.0 // indirect - github.com/fxamacker/cbor/v2 v2.4.0 // indirect + github.com/fxamacker/cbor/v2 v2.5.0 // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect @@ -47,15 +48,16 @@ require ( github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.2 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/rogpeppe/go-internal v1.11.0 // indirect github.com/x448/float16 v0.8.4 // indirect - go.flow.arcalot.io/testplugin v0.1.0 // indirect + go.flow.arcalot.io/testplugin v0.2.1 // indirect golang.org/x/mod v0.13.0 // indirect golang.org/x/net v0.15.0 // indirect golang.org/x/oauth2 v0.2.0 // indirect golang.org/x/sys v0.12.0 // indirect golang.org/x/term v0.12.0 // indirect golang.org/x/text v0.13.0 // indirect - golang.org/x/time v0.2.0 // indirect + golang.org/x/time v0.3.0 // indirect golang.org/x/tools v0.13.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/protobuf v1.28.1 // indirect diff --git a/go.sum b/go.sum index 50b30625..fc01050e 100644 --- a/go.sum +++ b/go.sum @@ -19,10 +19,12 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/docker/distribution v2.8.2+incompatible h1:T3de5rq0dB1j30rp0sA2rER+m322EBzniBPB6ZIzuh8= -github.com/docker/distribution v2.8.2+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= -github.com/docker/docker v24.0.5+incompatible h1:WmgcE4fxyI6EEXxBRxsHnZXrO1pQ3smi0k/jho4HLeY= -github.com/docker/docker v24.0.5+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0= +github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= +github.com/docker/distribution v2.8.3+incompatible h1:AtKxIZ36LoNK51+Z6RpzLpddBirtxJnzDrHLEKxTAYk= +github.com/docker/distribution v2.8.3+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= +github.com/docker/docker v24.0.6+incompatible h1:hceabKCtUgDqPu+qm0NgsaXf28Ljf4/pWFL7xjWWDgE= +github.com/docker/docker v24.0.6+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ= github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= @@ -39,8 +41,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84= github.com/flowstack/go-jsonschema v0.1.1/go.mod h1:yL7fNggx1o8rm9RlgXv7hTBWxdBM0rVwpMwimd3F3N0= -github.com/fxamacker/cbor/v2 v2.4.0 h1:ri0ArlOR+5XunOP8CRUowT0pSJOwhW098ZCUyskZD88= -github.com/fxamacker/cbor/v2 v2.4.0/go.mod h1:TA1xS00nchWmaBnEIxPSE5oHLuJBAVvqrtAnWBwBCVo= +github.com/fxamacker/cbor/v2 v2.5.0 h1:oHsG0V/Q6E/wqTS2O1Cozzsy69nqCiguo5Q1a1ADivE= +github.com/fxamacker/cbor/v2 v2.5.0/go.mod h1:TA1xS00nchWmaBnEIxPSE5oHLuJBAVvqrtAnWBwBCVo= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= @@ -90,7 +92,7 @@ github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28= -github.com/joho/godotenv v1.4.0 h1:3l4+N6zfMWnkbPEXKng2o2/MR5mSwTrBih4ZEkkz1lg= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= @@ -98,8 +100,8 @@ github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHm github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -108,7 +110,7 @@ github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0 github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/moby/spdystream v0.2.0 h1:cjW1zVyyoiM0T7b6UoySUFqzXMoqRckQtXwGPiBhOM8= github.com/moby/spdystream v0.2.0/go.mod h1:f7i0iNDQJ059oMTcWxx8MA/zKFIuD/lY+0GqbN2Wy8c= -github.com/moby/term v0.0.0-20221105221325-4eb28fa6025c h1:RC8WMpjonrBfyAh6VN/POIPtYD5tRAq0qMqCRjQNK+g= +github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -129,6 +131,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= +github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= +github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= @@ -139,8 +143,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= @@ -148,32 +152,32 @@ github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1: github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.arcalot.io/assert v1.3.0 h1:+uQex4s9gezATpTyFxUY5dlAcrwI1Me5fSmdcydGHho= -go.arcalot.io/assert v1.3.0/go.mod h1:Xy3ScX0p9IMY89gdsgexOKxnmDr0nGHG9dV7p8Uxg7w= +go.arcalot.io/assert v1.6.0 h1:iKA8SZZ1MRblMX5QAwwY5RbpR+VNyp//4IU7vo08Xu0= +go.arcalot.io/assert v1.6.0/go.mod h1:Xy3ScX0p9IMY89gdsgexOKxnmDr0nGHG9dV7p8Uxg7w= go.arcalot.io/dgraph v1.1.0 h1:c0LR7+xdUy7Ki6e4nR9rBvK0Upr4Nu49fu+poP/9WMg= go.arcalot.io/dgraph v1.1.0/go.mod h1:FuNv92OgHsJYepD6Unwn+S/4DioBnv06JxQ2BtQct7E= go.arcalot.io/lang v1.0.0 h1:mgDaieT4wWdZTnR4V7+/pgYRmzfU7VZZgIzHccuxAbY= go.arcalot.io/lang v1.0.0/go.mod h1:ALqfYEhAzC2WoGLaycmJoNJd5NmkR7V1PSKp/c5D278= go.arcalot.io/log/v2 v2.0.0 h1:mbmsWDVBXZNWrDzUh5JLzeGCQ59kTuMFs+pyfJGc1hk= go.arcalot.io/log/v2 v2.0.0/go.mod h1:1V8jnFIIGwh2CtcGkHNOmy1nCo7LbazQNkUcnKYNMn4= -go.flow.arcalot.io/deployer v0.2.0 h1:CpkCYlB8NfpmELIEPdw3/al8XknCSfD/L2vie2lJBJo= -go.flow.arcalot.io/deployer v0.2.0/go.mod h1:xVSB+svHVPmX6yTZIU0K4U/pDbs+rezsWa69vYA+E6k= -go.flow.arcalot.io/dockerdeployer v0.3.0 h1:9F9ZyaiB1kwKLRMXAktkyD3eDVvWoMYSHCojTrPDusk= -go.flow.arcalot.io/dockerdeployer v0.3.0/go.mod h1:9cMMHmkHxzHgLOLva6RmGb5d8jrbRkar/PcjeiU0QvA= -go.flow.arcalot.io/expressions v0.2.0 h1:2k8InnpLqVmv5SDvJ1xRz1ubqF+zKN44U08D50TkqJs= -go.flow.arcalot.io/expressions v0.2.0/go.mod h1:m4p6oCwjjxRjPCAYGeZDNP9DGfoDODHo1z4pBoO+Lpc= -go.flow.arcalot.io/kubernetesdeployer v0.5.1 h1:YpPFSouEWjxN8sxdr4BuZMl3IM9jCGEXBAnyoWJnHls= -go.flow.arcalot.io/kubernetesdeployer v0.5.1/go.mod h1:mdhwBGQ0wlquo+wUR0VNSAyj9NYxDdTwM7vYV1XmRnw= -go.flow.arcalot.io/pluginsdk v0.3.0-beta.1 h1:RrC5SKDkhwG/enE/FajAxRF1izET61/LO4lhaI9q094= -go.flow.arcalot.io/pluginsdk v0.3.0-beta.1/go.mod h1:7cEk8LSxpZakyfrmKTPbiMhlrZvWtCPYcaI7qfSu8MM= -go.flow.arcalot.io/podmandeployer v0.3.1 h1:AbRmTTtuK50PLkZyu194oSra0zUCeM3lDCWTc7oo4Ys= -go.flow.arcalot.io/podmandeployer v0.3.1/go.mod h1:SmROc9nHG+KfKasyTeKtGmI9EBlXCupXjBIgX5glGn8= -go.flow.arcalot.io/pythondeployer v0.1.3 h1:3W0tm1kdIqpP6TKO7OvVCiDlUh39wQOFwDnHISReE9Q= -go.flow.arcalot.io/pythondeployer v0.1.3/go.mod h1:vCkwB72TinFVb367/o0djptTvR+V004i1I5OQUeCcPU= -go.flow.arcalot.io/testdeployer v0.2.0 h1:4/cLr58/e6o5ouVRuJ5hM28nhciwJrL9AOE5Sdb7rN0= -go.flow.arcalot.io/testdeployer v0.2.0/go.mod h1:vy3Iu+9SHmugvOJRtMWAj8R+SE9BYi7k9Xi7DM5n7eQ= -go.flow.arcalot.io/testplugin v0.1.0 h1:I2BT978XISjaSnQbpaJfmjo2cTmTeBV7q+1IwTGbrig= -go.flow.arcalot.io/testplugin v0.1.0/go.mod h1:RsEWotEbX4irH+OM/d3dUOZMhHeBDDhqUymIKqZlESU= +go.flow.arcalot.io/deployer v0.3.0 h1:LPikgRG5jGA76W8JthycvzfREL5Y0+++KAiQxSnKhdU= +go.flow.arcalot.io/deployer v0.3.0/go.mod h1:x6gsz/hANR8qN1nerpyY3vXpdaqofDH5Wlg+Nsqg/x0= +go.flow.arcalot.io/dockerdeployer v0.4.0 h1:t5b8o3xfKKb/WIX558486csjo4uMQmAXsikBLsKFEIg= +go.flow.arcalot.io/dockerdeployer v0.4.0/go.mod h1:UZSM6buJBRlgCURUE/BVkak8tfAXzj3oeQBSRZECbSc= +go.flow.arcalot.io/expressions v0.2.1 h1:TAAbDrgJJLpmgA5ASyP/KzrXWtpEaQ8JsCPHgpe5kLw= +go.flow.arcalot.io/expressions v0.2.1/go.mod h1:Vw1ScNu4Uyw1/l87LAH8jxe0DyRWwMh+rlfB/BPYDOU= +go.flow.arcalot.io/kubernetesdeployer v0.7.0 h1:r41qWc/XiPy9l3cfMXZG8F2kGenRh1xsx2auim/Ydyw= +go.flow.arcalot.io/kubernetesdeployer v0.7.0/go.mod h1:VvU6duoo5NR2ITUhx/UCGrkdJnXIeYm+/yHmGKtkXsk= +go.flow.arcalot.io/pluginsdk v0.5.0 h1:TRS/waCTcdoMZ9neDAcfy3zpzyDnPHRbhV+Y1kpcw3Y= +go.flow.arcalot.io/pluginsdk v0.5.0/go.mod h1:2s2f//7uOkBjr1QaiWJD/bqDIeLlINJtD1BhiY4aGPM= +go.flow.arcalot.io/podmandeployer v0.5.0 h1:h7hEhWUgxJzNKlEohZ+meKhl3FWjaXQahQ8vN3YVRNs= +go.flow.arcalot.io/podmandeployer v0.5.0/go.mod h1:36JCcTB6nauahcXUPfIpdEw7Zfp0ufM07o3VNTvrCc0= +go.flow.arcalot.io/pythondeployer v0.3.0 h1:ercLuDwFoDSL0f6YvZEqFW0/nO7Yv7DkbROl3rKxYDk= +go.flow.arcalot.io/pythondeployer v0.3.0/go.mod h1:ND1x/Vhu/6q50zQeisCcD6oQ6lKVJFflOrfDccnIjSY= +go.flow.arcalot.io/testdeployer v0.3.0 h1:Soyz+rDa3Y3VjWBGuL3zNlX3LM4uKp9Ex7///fCgrZA= +go.flow.arcalot.io/testdeployer v0.3.0/go.mod h1:Eel0ORhtKdYYDsd+e+btBBygIn+9Sz/b+JFDwH39VWI= +go.flow.arcalot.io/testplugin v0.2.1 h1:9kQ2MKvcXtEcwk5c4qSWN+FovpER2C9vn730laAm9iE= +go.flow.arcalot.io/testplugin v0.2.1/go.mod h1:ZoVF8tIKppQmj5nvoZPA48GQ7BuoWXQcuCw2x2sJxjE= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -234,8 +238,8 @@ golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= -golang.org/x/time v0.2.0 h1:52I/1L54xyEQAYdtcSuxtiT84KGYTBGXwayxmIpNJhE= -golang.org/x/time v0.2.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= +golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -295,7 +299,7 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gotest.tools/v3 v3.4.0 h1:ZazjZUfuVeZGLAmlKKuyv3IKP5orXcwtOwDQH6YVr6o= +gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= k8s.io/api v0.25.4 h1:3YO8J4RtmG7elEgaWMb4HgmpS2CfY1QlaOz9nwB+ZSs= diff --git a/internal/step/dummy/provider.go b/internal/step/dummy/provider.go index cc99f333..9df7f8f9 100644 --- a/internal/step/dummy/provider.go +++ b/internal/step/dummy/provider.go @@ -176,7 +176,7 @@ func (r *runnableStep) Lifecycle(_ map[string]any) (result step.Lifecycle[step.L }, nil } -func (r *runnableStep) Start(_ map[string]any, stageChangeHandler step.StageChangeHandler) (step.RunningStep, error) { +func (r *runnableStep) Start(_ map[string]any, runID string, stageChangeHandler step.StageChangeHandler) (step.RunningStep, error) { ctx, cancel := context.WithCancel(context.Background()) @@ -189,6 +189,7 @@ func (r *runnableStep) Start(_ map[string]any, stageChangeHandler step.StageChan // the ProvideInputStage is not blocked. name: make(chan string, 1), state: step.RunningStepStateStarting, + runID: runID, } go s.run() @@ -200,10 +201,12 @@ type runningStep struct { stageChangeHandler step.StageChangeHandler ctx context.Context cancel context.CancelFunc + wg sync.WaitGroup lock *sync.Mutex name chan string state step.RunningStepState inputAvailable bool + runID string } func (r *runningStep) State() step.RunningStepState { @@ -249,8 +252,16 @@ func (r *runningStep) Close() error { return nil } +func (r *runningStep) ForceClose() error { + return r.Close() +} + func (r *runningStep) run() { - defer close(r.name) + r.wg.Add(1) + defer func() { + close(r.name) + r.wg.Done() + }() waitingForInput := false r.lock.Lock() if !r.inputAvailable { @@ -267,6 +278,7 @@ func (r *runningStep) run() { nil, string(StageIDGreet), waitingForInput, + &r.wg, ) select { case name, ok := <-r.name: @@ -283,7 +295,7 @@ func (r *runningStep) run() { r.lock.Lock() r.state = step.RunningStepStateFinished r.lock.Unlock() - r.stageChangeHandler.OnStepComplete(r, string(StageIDGreet), outputID, outputData) + r.stageChangeHandler.OnStepComplete(r, string(StageIDGreet), outputID, outputData, &r.wg) case <-r.ctx.Done(): return } diff --git a/internal/step/dummy/provider_test.go b/internal/step/dummy/provider_test.go index 6ac333c2..622896e4 100644 --- a/internal/step/dummy/provider_test.go +++ b/internal/step/dummy/provider_test.go @@ -2,6 +2,7 @@ package dummy_test import ( "fmt" + "sync" "testing" "go.arcalot.io/assert" @@ -13,7 +14,7 @@ type stageChangeHandler struct { message chan string } -func (s *stageChangeHandler) OnStageChange(_ step.RunningStep, _ *string, _ *string, _ *any, _ string, _ bool) { +func (s *stageChangeHandler) OnStageChange(_ step.RunningStep, _ *string, _ *string, _ *any, _ string, _ bool, _ *sync.WaitGroup) { } @@ -22,6 +23,7 @@ func (s *stageChangeHandler) OnStepComplete( previousStage string, previousStageOutputID *string, previousStageOutput *any, + _ *sync.WaitGroup, ) { if previousStage != "greet" { panic(fmt.Errorf("invalid previous stage: %s", previousStage)) @@ -49,7 +51,7 @@ func TestProvider(t *testing.T) { message: make(chan string), } - running, err := runnable.Start(map[string]any{}, handler) + running, err := runnable.Start(map[string]any{}, t.Name(), handler) assert.NoError(t, err) assert.NoError(t, running.ProvideStageInput("greet", map[string]any{ "name": "Arca Lot", diff --git a/internal/step/foreach/provider.go b/internal/step/foreach/provider.go index b0fa5d37..55d3b58b 100644 --- a/internal/step/foreach/provider.go +++ b/internal/step/foreach/provider.go @@ -19,7 +19,7 @@ func New( executorFactory func(logger log.Logger) (workflow.Executor, error), ) (step.Provider, error) { return &forEachProvider{ - logger: logger, + logger: logger.WithLabel("source", "foreach-provider"), yamlParserFactory: yamlParserFactory, executorFactory: executorFactory, }, nil @@ -319,9 +319,10 @@ func (r *runnableStep) RunSchema() map[string]*schema.PropertySchema { return map[string]*schema.PropertySchema{} } -func (r *runnableStep) Start(_ map[string]any, stageChangeHandler step.StageChangeHandler) (step.RunningStep, error) { +func (r *runnableStep) Start(_ map[string]any, runID string, stageChangeHandler step.StageChangeHandler) (step.RunningStep, error) { ctx, cancel := context.WithCancel(context.Background()) rs := &runningStep{ + runID: runID, ctx: ctx, cancel: cancel, lock: &sync.Mutex{}, @@ -338,6 +339,7 @@ func (r *runnableStep) Start(_ map[string]any, stageChangeHandler step.StageChan } type runningStep struct { + runID string workflow workflow.ExecutableWorkflow currentStage StageID lock *sync.Mutex @@ -345,6 +347,7 @@ type runningStep struct { inputAvailable bool inputData chan []any ctx context.Context + wg sync.WaitGroup cancel context.CancelFunc stageChangeHandler step.StageChangeHandler parallelism int64 @@ -363,13 +366,13 @@ func (r *runningStep) ProvideStageInput(stage string, input map[string]any) erro _, err := r.workflow.Input().Unserialize(item) if err != nil { r.lock.Unlock() - return fmt.Errorf("invalid input item %d for subworkflow (%w)", i, err) + return fmt.Errorf("invalid input item %d for subworkflow (%w) for run/step %s", i, err, r.runID) } input[i] = item } if r.inputAvailable { r.lock.Unlock() - return fmt.Errorf("input for execute workflow provided twice") + return fmt.Errorf("input for execute workflow provided twice for run/step %s", r.runID) } if r.currentState == step.RunningStepStateWaitingForInput && r.currentStage == StageIDExecute { r.currentState = step.RunningStepStateRunning @@ -404,11 +407,21 @@ func (r *runningStep) State() step.RunningStepState { func (r *runningStep) Close() error { r.cancel() + r.wg.Wait() return nil } +func (r *runningStep) ForceClose() error { + // For now, unless it becomes a problem, we'll just call the normal close function. + return r.Close() +} + func (r *runningStep) run() { - defer close(r.inputData) + r.wg.Add(1) + defer func() { + close(r.inputData) + r.wg.Done() + }() waitingForInput := false r.lock.Lock() if !r.inputAvailable { @@ -425,6 +438,7 @@ func (r *runningStep) run() { nil, string(StageIDExecute), waitingForInput, + &r.wg, ) select { case loopData, ok := <-r.inputData: @@ -435,7 +449,7 @@ func (r *runningStep) run() { itemOutputs := make([]any, len(loopData)) itemErrors := make(map[int]string, len(loopData)) - r.logger.Debugf("Executing subworkflow...") + r.logger.Debugf("Executing subworkflow for step %s...", r.runID) wg := &sync.WaitGroup{} wg.Add(len(loopData)) errors := false @@ -471,7 +485,7 @@ func (r *runningStep) run() { }() } wg.Wait() - r.logger.Debugf("Subworkflow complete.") + r.logger.Debugf("Subworkflow %s complete.", r.runID) r.lock.Lock() previousStage := string(r.currentStage) r.currentState = step.RunningStepStateRunning @@ -506,12 +520,13 @@ func (r *runningStep) run() { nil, string(currentStage), false, + &r.wg, ) r.lock.Lock() r.currentState = step.RunningStepStateFinished previousStage = string(r.currentStage) r.lock.Unlock() - r.stageChangeHandler.OnStepComplete(r, previousStage, &outputID, &outputData) + r.stageChangeHandler.OnStepComplete(r, previousStage, &outputID, &outputData, &r.wg) case <-r.ctx.Done(): return } diff --git a/internal/step/plugin/provider.go b/internal/step/plugin/provider.go index a1708766..71fc0354 100644 --- a/internal/step/plugin/provider.go +++ b/internal/step/plugin/provider.go @@ -3,8 +3,11 @@ package plugin import ( "context" "fmt" + "go.flow.arcalot.io/pluginsdk/plugin" + "reflect" "strings" "sync" + "time" log "go.arcalot.io/log/v2" "go.flow.arcalot.io/deployer" @@ -28,12 +31,12 @@ func New(logger log.Logger, deployerRegistry registry.Registry, localDeployerCon if err != nil { return nil, fmt.Errorf("failed to load local deployer configuration, please check your Arcaflow configuration file (%w)", err) } - localDeployer, err := deployerRegistry.Create(unserializedLocalDeployerConfig, logger) + localDeployer, err := deployerRegistry.Create(unserializedLocalDeployerConfig, logger.WithLabel("source", "deployer")) if err != nil { return nil, fmt.Errorf("invalid local deployer configuration, please check your Arcaflow configuration file (%w)", err) } return &pluginProvider{ - logger: logger, + logger: logger.WithLabel("source", "plugin-provider"), deployerRegistry: deployerRegistry, localDeployer: localDeployer, }, nil @@ -166,6 +169,7 @@ var crashedLifecycleStage = step.LifecycleStage{ FinishedName: "crashed", } +// Lifecycle returns a lifecycle that contains all plugin lifecycle stages. func (p *pluginProvider) Lifecycle() step.Lifecycle[step.LifecycleStage] { return step.Lifecycle[step.LifecycleStage]{ InitialStage: string(StageIDDeploy), @@ -181,11 +185,14 @@ func (p *pluginProvider) Lifecycle() step.Lifecycle[step.LifecycleStage] { } } +// LoadSchema deploys the plugin, connects to the plugin's ATP server, loads its schema, then +// returns a runnableStep struct. Not to be confused with the runningStep struct. func (p *pluginProvider) LoadSchema(inputs map[string]any, _ map[string][]byte) (step.RunnableStep, error) { image := inputs["plugin"].(string) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + plugin, err := p.localDeployer.Deploy(ctx, image) if err != nil { cancel() @@ -199,13 +206,17 @@ func (p *pluginProvider) LoadSchema(inputs map[string]any, _ map[string][]byte) cancel() return nil, fmt.Errorf("failed to read plugin schema from %s (%w)", image, err) } + // Tell the server that the client is done + if err := transport.Close(); err != nil { + return nil, fmt.Errorf("failed to instruct client to shut down image %s (%w)", image, err) + } // Shut down the plugin. if err := plugin.Close(); err != nil { return nil, fmt.Errorf("failed to shut down local plugin from %s (%w)", image, err) } return &runnableStep{ - schemas: s, + schemas: *s, logger: p.logger, image: image, deployerRegistry: p.deployerRegistry, @@ -217,7 +228,7 @@ type runnableStep struct { image string deployerRegistry registry.Registry logger log.Logger - schemas schema.Schema[schema.Step] + schemas schema.SchemaSchema localDeployer deployer.Connector } @@ -261,6 +272,32 @@ func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[st if !ok { return result, fmt.Errorf("the step '%s' does not exist in the '%s' plugin", stepID, r.image) } + + stopIfProperty := schema.NewPropertySchema( + schema.NewAnySchema(), + schema.NewDisplayValue( + schema.PointerTo("Stop condition"), + schema.PointerTo("If this field is filled with a non-false value, the step is cancelled (even if currently executing)."), + nil, + ), + false, + nil, + nil, + nil, + nil, + nil, + ) + // Now validate that the step's internal dependencies can be resolved (like stop_if's dependency on the cancel signal) + cancelSignal := stepSchema.SignalHandlers()[plugin.CancellationSignalSchema.ID()] + if cancelSignal == nil { + // Not present + stopIfProperty.Disable(fmt.Sprintf("Cancel signal with ID '%s' is not present in plugin image '%s', step '%s'. Signal handler IDs present: %v", + plugin.CancellationSignalSchema.ID(), r.image, stepID, reflect.ValueOf(stepSchema.SignalHandlers()).MapKeys())) + } else if err := plugin.CancellationSignalSchema.DataSchemaValue.ValidateCompatibility(cancelSignal.DataSchemaValue); err != nil { + // Present but incompatible + stopIfProperty.Disable(fmt.Sprintf("Cancel signal invalid schema in plugin image '%s', step '%s' (%s)", r.image, stepID, err)) + } + return step.Lifecycle[step.LifecycleStageWithSchema]{ InitialStage: "deploying", Stages: []step.LifecycleStageWithSchema{ @@ -350,20 +387,7 @@ func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[st { LifecycleStage: cancelledLifecycleStage, InputSchema: map[string]*schema.PropertySchema{ - "stop_if": schema.NewPropertySchema( - schema.NewAnySchema(), - schema.NewDisplayValue( - schema.PointerTo("Stop condition"), - schema.PointerTo("If this field is filled with a non-false value, the step is cancelled (even if currently executing)."), - nil, - ), - false, - nil, - nil, - nil, - nil, - nil, - ), + "stop_if": stopIfProperty, }, Outputs: nil, }, @@ -403,7 +427,7 @@ func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[st }, nil } -func (r *runnableStep) Start(input map[string]any, stageChangeHandler step.StageChangeHandler) (step.RunningStep, error) { +func (r *runnableStep) Start(input map[string]any, runID string, stageChangeHandler step.StageChangeHandler) (step.RunningStep, error) { rawStep, ok := input["step"] stepID := "" if ok && rawStep != nil { @@ -448,15 +472,17 @@ func (r *runnableStep) Start(input map[string]any, stageChangeHandler step.Stage lock: &sync.Mutex{}, ctx: ctx, cancel: cancel, - done: make(chan struct{}), deployInput: make(chan any, 1), runInput: make(chan any, 1), logger: r.logger, image: r.image, - step: stepID, + pluginStepID: stepID, state: step.RunningStepStateStarting, localDeployer: r.localDeployer, - executionChannel: make(chan executionResult), + executionChannel: make(chan atp.ExecutionResult), + signalToStep: make(chan schema.Input), + signalFromStep: make(chan schema.Input), + runID: runID, } go s.run() @@ -469,34 +495,43 @@ type runningStep struct { stepSchema schema.Step stageChangeHandler step.StageChangeHandler lock *sync.Mutex + wg sync.WaitGroup ctx context.Context cancel context.CancelFunc - done chan struct{} + atpClient atp.Client deployInput chan any deployInputAvailable bool runInput chan any runInputAvailable bool logger log.Logger currentStage StageID + runID string // The ID associated with this execution (the workflow step ID) image string - step string + pluginStepID string // The ID of the step in the plugin state step.RunningStepState useLocalDeployer bool localDeployer deployer.Connector container deployer.Plugin - executionChannel chan executionResult + executionChannel chan atp.ExecutionResult + signalToStep chan schema.Input // Communicates with the ATP client, not other steps. + signalFromStep chan schema.Input // Communicates with the ATP client, not other steps. + closed bool + // Store channels for sending pre-calculated signal outputs to other steps? + // Store channels for receiving pre-calculated signal inputs from other steps? } func (r *runningStep) CurrentStage() string { r.lock.Lock() defer r.lock.Unlock() - return string(r.currentStage) + tempStage := string(r.currentStage) + return tempStage } func (r *runningStep) State() step.RunningStepState { r.lock.Lock() defer r.lock.Unlock() - return r.state + tempState := r.state + return tempState } func (r *runningStep) ProvideStageInput(stage string, input map[string]any) error { @@ -504,104 +539,173 @@ func (r *runningStep) ProvideStageInput(stage string, input map[string]any) erro // affect the counting of step states in the workflow's Execute function // and notifySteps function. r.lock.Lock() + defer r.lock.Unlock() + r.logger.Debugf("ProvideStageInput START") + defer r.logger.Debugf("ProvideStageInput END") - // Checks which stage it is getting input for + // Checks which stage it is getting input for, and handles it. switch stage { case string(StageIDDeploy): - // input provided on this call overwrites the deployer configuration - // set at this plugin provider's instantiation - if r.deployInputAvailable { - r.lock.Unlock() - return fmt.Errorf("deployment information provided more than once") - } - var unserializedDeployerConfig any - var err error - if input["deploy"] != nil { - unserializedDeployerConfig, err = r.deployerRegistry.Schema().Unserialize(input["deploy"]) - if err != nil { - r.lock.Unlock() - return fmt.Errorf("invalid deployment information (%w)", err) - } - } else { - r.useLocalDeployer = true - } - // Make sure we transition the state before unlocking so there are no race conditions. - - r.deployInputAvailable = true - if r.state == step.RunningStepStateWaitingForInput && r.currentStage == StageIDDeploy { - r.state = step.RunningStepStateRunning - } - r.lock.Unlock() - // Feed the deploy step its input. - r.deployInput <- unserializedDeployerConfig - return nil + return r.provideDeployInput(input) case string(StageIDStarting): - if r.runInputAvailable { - r.lock.Unlock() - return fmt.Errorf("input provided more than once") - } - if input["input"] == nil { - r.lock.Unlock() - return fmt.Errorf("bug: invalid input for 'running' stage, expected 'input' field") - } - if _, err := r.stepSchema.Input().Unserialize(input["input"]); err != nil { - r.lock.Unlock() - return err - } - // Make sure we transition the state before unlocking so there are no race conditions. - r.runInputAvailable = true - if r.state == step.RunningStepStateWaitingForInput && r.currentStage == StageIDStarting { - r.state = step.RunningStepStateRunning - } - // Unlock before passing the data over the channel to prevent a deadlock. - // The other end of the channel needs to be unlocked to read the data. - r.lock.Unlock() - // Feed the run step its input over the channel. - r.runInput <- input["input"] - return nil + return r.provideStartingInput(input) case string(StageIDRunning): - r.lock.Unlock() return nil case string(StageIDCancelled): - if input["stop_if"] != false && input["stop_if"] != nil { - r.logger.Infof("Cancelling step %s", r.step) - r.cancel() // This should cancel the plugin deployment or execution. - } - r.lock.Unlock() - return nil + return r.provideCancelledInput(input) case string(StageIDDeployFailed): - r.lock.Unlock() return nil case string(StageIDCrashed): - r.lock.Unlock() return nil case string(StageIDOutput): - r.lock.Unlock() return nil default: - r.lock.Unlock() return fmt.Errorf("bug: invalid stage: %s", stage) } } +func (r *runningStep) provideDeployInput(input map[string]any) error { + // Note: The calling function must have the step mutex locked + // input provided on this call overwrites the deployer configuration + // set at this plugin provider's instantiation + if r.deployInputAvailable { + return fmt.Errorf("deployment information provided more than once") + } + var unserializedDeployerConfig any + var err error + if input["deploy"] != nil { + unserializedDeployerConfig, err = r.deployerRegistry.Schema().Unserialize(input["deploy"]) + if err != nil { + return fmt.Errorf("invalid deployment information (%w)", err) + } + } else { + r.useLocalDeployer = true + } + // Make sure we transition the state before unlocking so there are no race conditions. + + r.deployInputAvailable = true + if r.state == step.RunningStepStateWaitingForInput && r.currentStage == StageIDDeploy { + r.state = step.RunningStepStateRunning + } + + // Feed the deploy step its input. + select { + case r.deployInput <- unserializedDeployerConfig: + default: + return fmt.Errorf("unable to provide input to deploy stage for step %s/%s", r.runID, r.pluginStepID) + } + return nil +} + +func (r *runningStep) provideStartingInput(input map[string]any) error { + // Note: The calling function must have the step mutex locked + if r.runInputAvailable { + return fmt.Errorf("input provided more than once") + } + // Ensure input is given + if input["input"] == nil { + return fmt.Errorf("bug: invalid input for 'running' stage, expected 'input' field") + } + // Validate the input by unserializing it + if _, err := r.stepSchema.Input().Unserialize(input["input"]); err != nil { + return err + } + // Make sure we transition the state before unlocking so there are no race conditions. + r.runInputAvailable = true + + // Unlock before passing the data over the channel to prevent a deadlock. + // The other end of the channel needs to be unlocked to read the data. + + // Feed the run step its input over the channel. + select { + case r.runInput <- input["input"]: + default: + return fmt.Errorf("unable to provide input to run stage for step %s/%s", r.runID, r.pluginStepID) + } + return nil +} + +func (r *runningStep) provideCancelledInput(input map[string]any) error { + // Note: The calling function must have the step mutex locked + // Cancel if the step field is present and isn't false + if input["stop_if"] != false && input["stop_if"] != nil { + r.cancelStep() + } + return nil +} + +// cancelStep gracefully requests cancellation for any stage. +// If running, it sends a cancel signal if the plugin supports it. +func (r *runningStep) cancelStep() { + r.logger.Infof("Cancelling step %s/%s", r.runID, r.pluginStepID) + // We only need to call the signal if the step is running. + // If it isn't, cancelling the context alone should be enough. + if r.currentStage == StageIDRunning { + // Verify that the step has a cancel signal + cancelSignal := r.stepSchema.SignalHandlers()[plugin.CancellationSignalSchema.ID()] + if cancelSignal == nil { + r.logger.Errorf("could not cancel step %s/%s. Does not contain cancel signal receiver.", r.runID, r.pluginStepID) + } else if err := plugin.CancellationSignalSchema.DataSchema().ValidateCompatibility(cancelSignal.DataSchema()); err != nil { + r.logger.Errorf("validation failed for cancel signal for step %s/%s: %s", r.runID, r.pluginStepID, err) + } else { + // Validated. Now call the signal. + r.signalToStep <- schema.Input{RunID: r.runID, ID: cancelSignal.ID(), InputData: map[any]any{}} + } + } + // Now cancel the context to stop the non-running parts of the step + r.cancel() +} + +// ForceClose closes the step without waiting for a graceful shutdown of the ATP client. +// Warning: This means that it won't wait for the ATP client to finish. This is okay if using a deployer that +// will stop execution once the deployer closes it. +func (r *runningStep) ForceClose() error { + err := r.closeComponents(false) + // Wait for the run to finish to ensure that it's not running after closing. + r.wg.Wait() + r.closed = true + r.logger.Warningf("Step %s/%s force closed.", r.runID, r.pluginStepID) + return err +} + func (r *runningStep) Close() error { + err := r.closeComponents(true) + // Wait for the run to finish to ensure that it's not running after closing. + r.wg.Wait() + r.closed = true + return err +} + +func (r *runningStep) closeComponents(closeATP bool) error { r.cancel() r.lock.Lock() + if r.closed { + return nil // Already closed + } + var atpErr error + var containerErr error + if r.atpClient != nil && closeATP { + atpErr = r.atpClient.Close() + } if r.container != nil { - if err := r.container.Close(); err != nil { - return fmt.Errorf("failed to stop container (%w)", err) - } + containerErr = r.container.Close() } r.container = nil r.lock.Unlock() - <-r.done + if containerErr != nil { + return fmt.Errorf("error while stopping container (%w)", containerErr) + } else if atpErr != nil { + return fmt.Errorf("error while stopping atp client (%w)", atpErr) + // Do not wait in this case. It may never get resolved. + } return nil } func (r *runningStep) run() { + r.wg.Add(1) // Wait for the run to finish before closing. defer func() { - r.cancel() - close(r.done) + r.cancel() // Close before WaitGroup done + r.wg.Done() // Done. Close may now exit. }() container, err := r.deployStage() if err != nil { @@ -612,7 +716,7 @@ func (r *runningStep) run() { select { case <-r.ctx.Done(): if err := container.Close(); err != nil { - r.logger.Warningf("failed to remove deployed container for step %s", r.step) + r.logger.Warningf("failed to remove deployed container for step %s/%s", r.runID, r.pluginStepID) } r.lock.Unlock() return @@ -620,6 +724,7 @@ func (r *runningStep) run() { r.container = container } r.lock.Unlock() + r.logger.Debugf("Successfully deployed container with ID '%s' for step %s/%s", container.ID(), r.runID, r.pluginStepID) if err := r.startStage(container); err != nil { r.startFailed(err) return @@ -630,12 +735,9 @@ func (r *runningStep) run() { } func (r *runningStep) deployStage() (deployer.Plugin, error) { + r.logger.Debugf("Deploying stage for step %s/%s", r.runID, r.pluginStepID) r.lock.Lock() - if !r.deployInputAvailable { - r.state = step.RunningStepStateWaitingForInput - } else { - r.state = step.RunningStepStateRunning - } + r.state = step.RunningStepStateRunning deployInputAvailable := r.deployInputAvailable r.lock.Unlock() @@ -646,24 +748,39 @@ func (r *runningStep) deployStage() (deployer.Plugin, error) { nil, string(StageIDDeploy), deployInputAvailable, + &r.wg, ) var deployerConfig any var useLocalDeployer bool + // First, non-blocking retrieval select { case deployerConfig = <-r.deployInput: r.lock.Lock() r.state = step.RunningStepStateRunning - useLocalDeployer = r.useLocalDeployer r.lock.Unlock() - case <-r.ctx.Done(): - return nil, fmt.Errorf("step closed before deployment config could be obtained") + default: // Default, so it doesn't block on this receive + // It's waiting now. + r.lock.Lock() + r.state = step.RunningStepStateWaitingForInput + r.lock.Unlock() + select { + case deployerConfig = <-r.deployInput: + r.lock.Lock() + r.state = step.RunningStepStateRunning + r.lock.Unlock() + case <-r.ctx.Done(): + return nil, fmt.Errorf("step closed before deployment config could be obtained") + } } + r.lock.Lock() + useLocalDeployer = r.useLocalDeployer + r.lock.Unlock() var stepDeployer = r.localDeployer if !useLocalDeployer { var err error - stepDeployer, err = r.deployerRegistry.Create(deployerConfig, r.logger) + stepDeployer, err = r.deployerRegistry.Create(deployerConfig, r.logger.WithLabel("source", "deployer")) if err != nil { return nil, err } @@ -675,22 +792,23 @@ func (r *runningStep) deployStage() (deployer.Plugin, error) { return container, nil } -type executionResult struct { - outputID string - outputData any - err error -} - func (r *runningStep) startStage(container deployer.Plugin) error { + r.logger.Debugf("Starting stage for step %s/%s", r.runID, r.pluginStepID) r.lock.Lock() previousStage := string(r.currentStage) r.currentStage = StageIDStarting + inputRecievedEarly := false - if !r.runInputAvailable { - r.state = step.RunningStepStateWaitingForInput - } else { + var runInput any + select { + case runInput = <-r.runInput: + // Good. It received it immediately. r.state = step.RunningStepStateRunning + inputRecievedEarly = true + default: // The default makes it not wait. + r.state = step.RunningStepStateWaitingForInput } + runInputAvailable := r.runInputAvailable r.lock.Unlock() @@ -701,215 +819,178 @@ func (r *runningStep) startStage(container deployer.Plugin) error { nil, string(StageIDStarting), runInputAvailable, + &r.wg, ) r.lock.Lock() r.currentStage = StageIDStarting - r.state = step.RunningStepStateWaitingForInput + r.logger.Debugf("Waiting for input state while starting 2.") r.lock.Unlock() - var runInput any - select { - case runInput = <-r.runInput: + // First, try to non-blocking retrieve the runInput. + // If not yet available, set to state waiting for input and do a blocking receive. + // If it is available, continue. + if !inputRecievedEarly { + // Input is not yet available. Now waiting. r.lock.Lock() - r.state = step.RunningStepStateRunning + if r.state != step.RunningStepStateWaitingForInput { + r.logger.Warningf("State not waiting for input when receiving from channel.") + } r.lock.Unlock() - case <-r.ctx.Done(): - return fmt.Errorf("step closed while waiting for run configuration") + + // Do a blocking wait for input now. + select { + case runInput = <-r.runInput: + r.lock.Lock() + r.state = step.RunningStepStateRunning + r.lock.Unlock() + case <-r.ctx.Done(): + return fmt.Errorf("step closed while waiting for run configuration") + } } - atpClient := atp.NewClientWithLogger(container, r.logger) + r.atpClient = atp.NewClientWithLogger(container, r.logger) - inputSchema, err := atpClient.ReadSchema() + inputSchema, err := r.atpClient.ReadSchema() if err != nil { return err } steps := inputSchema.Steps() - stepSchema, ok := steps[r.step] + stepSchema, ok := steps[r.pluginStepID] if !ok { - return fmt.Errorf("schema mismatch between local and remote deployed plugin, no stepSchema named %s found in remote", r.step) + return fmt.Errorf("error in run step %s: schema mismatch between local and remote deployed plugin, no stepSchema named %s found in remote", r.runID, r.pluginStepID) } + // Re-verify input. This should have also been done earlier. if _, err := stepSchema.Input().Unserialize(runInput); err != nil { - return fmt.Errorf("schema mismatch between local and remote deployed plugin, unserializing input failed (%w)", err) + return fmt.Errorf("schema mismatch between local and remote deployed plugin in step %s/%s, unserializing input failed (%w)", r.runID, r.pluginStepID, err) } - // Runs the ATP client in a goroutine in order to wait for it or context done. - // On context done, the deployer tries to end execution. That will shut down - // (with sigterm) the container. Then wait for output, or error out. + // Runs the ATP client in a goroutine in order to wait for it. + // On context done, the deployer has 30 seconds before it will error out. go func() { - outputID, outputData, err := atpClient.Execute(r.step, runInput) - r.executionChannel <- executionResult{outputID, outputData, err} + result := r.atpClient.Execute( + schema.Input{RunID: r.runID, ID: r.pluginStepID, InputData: runInput}, + r.signalToStep, + r.signalFromStep, + ) + r.executionChannel <- result + if err = r.atpClient.Close(); err != nil { + r.logger.Warningf("Error while closing ATP client: %s", err) + } }() return nil } func (r *runningStep) runStage() error { - r.lock.Lock() - previousStage := string(r.currentStage) - r.currentStage = StageIDRunning - r.state = step.RunningStepStateRunning - r.lock.Unlock() - - r.stageChangeHandler.OnStageChange( - r, - &previousStage, - nil, - nil, - string(StageIDRunning), - false, - ) + r.logger.Debugf("Running stage for step %s/%s", r.runID, r.pluginStepID) + r.transitionStage(StageIDRunning, step.RunningStepStateRunning) - var result executionResult + var result atp.ExecutionResult select { case result = <-r.executionChannel: - if result.err != nil { - return result.err + if result.Error != nil { + return result.Error } case <-r.ctx.Done(): - // In this case, it is being instructed to stop. + // In this case, it is being instructed to stop. A signal should have been sent. // Shutdown (with sigterm) the container, then wait for the output (valid or error). - r.logger.Debugf("Running step context done before step run complete. Cancelling and waiting for result.") - r.cancel() - // If necessary, you can add a timeout here for shutdowns that take too long. - result = <-r.executionChannel - } - - // Execution complete, move to finished stage. - r.lock.Lock() - // Be careful that everything here is set correctly. - // Else it will cause undesired behavior. - previousStage = string(r.currentStage) - r.currentStage = StageIDOutput - // First running, then state change, then finished. - // This is so it properly steps through all the stages it needs to. - r.state = step.RunningStepStateRunning - r.lock.Unlock() + r.logger.Debugf("Got step context done before step run complete. Waiting up to 30 seconds for result.") + select { + case result = <-r.executionChannel: + // Successfully stopped before end of timeout. + case <-time.After(time.Duration(30) * time.Second): + r.logger.Warningf("Step %s/%s did not complete within the 30 second time limit. Force closing container.", + r.runID, r.pluginStepID) + if err := r.ForceClose(); err != nil { + r.logger.Warningf("Error in step %s/%s while closing plugin container (%w)", r.runID, r.pluginStepID, err) + } + } - r.stageChangeHandler.OnStageChange( - r, - &previousStage, - nil, - nil, - string(r.currentStage), - false) + } - r.lock.Lock() - r.state = step.RunningStepStateFinished - r.lock.Unlock() - r.stageChangeHandler.OnStepComplete( - r, - string(r.currentStage), - &result.outputID, - &result.outputData, - ) + // Execution complete, move to state running stage outputs, then to state finished stage. + r.transitionStage(StageIDOutput, step.RunningStepStateRunning) + r.completeStage(r.currentStage, step.RunningStepStateFinished, &result.OutputID, &result.OutputData) return nil } func (r *runningStep) deployFailed(err error) { - r.lock.Lock() - previousStage := string(r.currentStage) - r.currentStage = StageIDDeployFailed - // Don't forget to update this, or else it will behave very oddly. - // First running, then finished. You can't skip states. - r.state = step.RunningStepStateRunning - r.lock.Unlock() - - r.stageChangeHandler.OnStageChange( - r, - &previousStage, - nil, - nil, - string(StageIDDeployFailed), - false, - ) - r.logger.Warningf("Plugin %s deploy failed. %v", r.step, err) + r.logger.Debugf("Deploy failed stage for step %s/%s", r.runID, r.pluginStepID) + r.transitionStage(StageIDDeployFailed, step.RunningStepStateRunning) + r.logger.Warningf("Plugin step %s/%s deploy failed. %v", r.runID, r.pluginStepID, err) // Now it's done. - r.lock.Lock() - r.currentStage = StageIDDeployFailed - r.state = step.RunningStepStateFinished - r.lock.Unlock() - outputID := errorStr output := any(DeployFailed{ Error: err.Error(), }) - r.stageChangeHandler.OnStepComplete( - r, - string(r.currentStage), - &outputID, - &output, - ) + r.completeStage(StageIDDeployFailed, step.RunningStepStateFinished, &outputID, &output) } func (r *runningStep) startFailed(err error) { - r.lock.Lock() - previousStage := string(r.currentStage) - r.currentStage = StageIDCrashed - r.lock.Unlock() - - r.stageChangeHandler.OnStageChange( - r, - &previousStage, - nil, - nil, - string(r.currentStage), - false) - r.logger.Warningf("Plugin step %s start failed. %v", r.step, err) + r.logger.Debugf("Start failed stage for step %s/%s", r.runID, r.pluginStepID) + r.transitionStage(StageIDCrashed, step.RunningStepStateRunning) + r.logger.Warningf("Plugin step %s/%s start failed. %v", r.runID, r.pluginStepID, err) // Now it's done. - r.lock.Lock() - r.currentStage = StageIDCrashed - r.state = step.RunningStepStateFinished - r.lock.Unlock() - outputID := errorStr output := any(Crashed{ Output: err.Error(), }) - r.stageChangeHandler.OnStepComplete( - r, - string(r.currentStage), - &outputID, - &output, - ) + + r.completeStage(StageIDCrashed, step.RunningStepStateFinished, &outputID, &output) } func (r *runningStep) runFailed(err error) { + r.logger.Debugf("Run failed stage for step %s/%s", r.runID, r.pluginStepID) + r.transitionStage(StageIDCrashed, step.RunningStepStateRunning) + r.logger.Warningf("Plugin step %s/%s run failed. %v", r.runID, r.pluginStepID, err) + + // Now it's done. + outputID := errorStr + output := any(Crashed{ + Output: err.Error(), + }) + r.completeStage(StageIDCrashed, step.RunningStepStateFinished, &outputID, &output) +} + +// TransitionStage transitions the stage to the specified stage, and the state to the specified state. +// +//nolint:unparam +func (r *runningStep) transitionStage(newStage StageID, state step.RunningStepState) { // A current lack of observability into the atp client prevents // non-fragile testing of this function. - r.lock.Lock() previousStage := string(r.currentStage) - r.currentStage = StageIDCrashed + r.currentStage = newStage // Don't forget to update this, or else it will behave very oddly. - // First running, then finished. You can't skip states. r.state = step.RunningStepStateRunning + // First running, then finished. You can't skip states. + r.state = state r.lock.Unlock() - r.stageChangeHandler.OnStageChange( r, &previousStage, nil, nil, - string(r.currentStage), - false) - - r.logger.Warningf("Plugin step %s run failed. %v", r.step, err) + string(newStage), + false, + &r.wg, + ) +} - // Now it's done. +//nolint:unparam +func (r *runningStep) completeStage(currentStage StageID, state step.RunningStepState, outputID *string, previousStageOutput *any) { r.lock.Lock() - r.currentStage = StageIDCrashed - r.state = step.RunningStepStateFinished + previousStage := string(r.currentStage) + r.currentStage = currentStage + r.state = state r.lock.Unlock() - outputID := errorStr - output := any(Crashed{ - Output: err.Error(), - }) r.stageChangeHandler.OnStepComplete( r, - string(r.currentStage), - &outputID, - &output, + previousStage, + outputID, + previousStageOutput, + &r.wg, ) } diff --git a/internal/step/plugin/provider_test.go b/internal/step/plugin/provider_test.go index de3d797f..2db5f39b 100644 --- a/internal/step/plugin/provider_test.go +++ b/internal/step/plugin/provider_test.go @@ -9,6 +9,7 @@ import ( "go.flow.arcalot.io/engine/internal/step" "go.flow.arcalot.io/engine/internal/step/plugin" testdeployer "go.flow.arcalot.io/testdeployer" + "sync" "testing" ) @@ -16,7 +17,7 @@ type deployFailStageChangeHandler struct { message chan string } -func (s *deployFailStageChangeHandler) OnStageChange(_ step.RunningStep, _ *string, _ *string, _ *any, _ string, _ bool) { +func (s *deployFailStageChangeHandler) OnStageChange(_ step.RunningStep, _ *string, _ *string, _ *any, _ string, _ bool, _ *sync.WaitGroup) { } @@ -25,6 +26,7 @@ func (s *deployFailStageChangeHandler) OnStepComplete( previousStage string, previousStageOutputID *string, previousStageOutput *any, + _ *sync.WaitGroup, ) { if previousStage != string(plugin.StageIDDeployFailed) { panic(fmt.Errorf("invalid previous stage: %s", previousStage)) @@ -47,7 +49,7 @@ type startFailStageChangeHandler struct { message chan string } -func (s *startFailStageChangeHandler) OnStageChange(_ step.RunningStep, _ *string, _ *string, _ *any, _ string, _ bool) { +func (s *startFailStageChangeHandler) OnStageChange(_ step.RunningStep, _ *string, _ *string, _ *any, _ string, _ bool, _ *sync.WaitGroup) { } @@ -56,6 +58,7 @@ func (s *startFailStageChangeHandler) OnStepComplete( previousStage string, previousStageOutputID *string, previousStageOutput *any, + _ *sync.WaitGroup, ) { if previousStage != string(plugin.StageIDCrashed) { panic(fmt.Errorf("invalid previous stage: %s", previousStage)) @@ -79,7 +82,7 @@ type stageChangeHandler struct { message chan string } -func (s *stageChangeHandler) OnStageChange(_ step.RunningStep, _ *string, _ *string, _ *any, _ string, _ bool) { +func (s *stageChangeHandler) OnStageChange(_ step.RunningStep, _ *string, _ *string, _ *any, _ string, _ bool, _ *sync.WaitGroup) { } @@ -88,6 +91,7 @@ func (s *stageChangeHandler) OnStepComplete( previousStage string, previousStageOutputID *string, previousStageOutput *any, + _ *sync.WaitGroup, ) { if previousStage != string(plugin.StageIDOutput) { panic(fmt.Errorf("invalid previous stage: %s", previousStage)) @@ -141,8 +145,12 @@ func TestProvider_Utility(t *testing.T) { _, err = runnable.Lifecycle(map[string]any{"step": "wait"}) assert.NoError(t, err) - _, err = runnable.Lifecycle(map[string]any{"step": nil}) + _, err = runnable.Lifecycle(map[string]any{"step": "hello"}) assert.NoError(t, err) + + // There is more than one step, so no specified one will cause an error. + _, err = runnable.Lifecycle(map[string]any{"step": nil}) + assert.Error(t, err) } func TestProvider_HappyError(t *testing.T) { @@ -182,11 +190,11 @@ func TestProvider_HappyError(t *testing.T) { } // start with a step id that is not in the schema - _, err = runnable.Start(map[string]any{"step": "wrong_stepid"}, handler) + _, err = runnable.Start(map[string]any{"step": "wrong_stepid"}, t.Name(), handler) assert.Error(t, err) - // default step id - running, err := runnable.Start(map[string]any{"step": nil}, handler) + // wait step + running, err := runnable.Start(map[string]any{"step": "wait"}, t.Name(), handler) assert.NoError(t, err) // non-existent stage @@ -255,6 +263,59 @@ func TestProvider_HappyError(t *testing.T) { }) } +func TestProvider_VerifyCancelSignal(t *testing.T) { + logger := log.New( + log.Config{ + Level: log.LevelError, + Destination: log.DestinationStdout, + }, + ) + workflowDeployerCfg := map[string]any{ + "type": "test-impl", + } + + deployerRegistry := deployer_registry.New( + deployer.Any(testdeployer.NewFactory())) + + plp, err := plugin.New( + logger, + deployerRegistry, + workflowDeployerCfg, + ) + assert.NoError(t, err) + + runnable, err := plp.LoadSchema( + map[string]any{"plugin": "simulation"}, map[string][]byte{}) + assert.NoError(t, err) + assert.NotNil(t, runnable) + + waitLifecycle, err := runnable.Lifecycle(map[string]any{"step": "wait"}) + assert.NoError(t, err) + // Verify that the expected lifecycle stage is there, then verify that cancel is disabled. + waitCancelledStageIDIndex := assert.SliceContainsExtractor(t, + func(schema step.LifecycleStageWithSchema) string { + return schema.ID + }, string(plugin.StageIDCancelled), waitLifecycle.Stages) + waitStageIDCancelled := waitLifecycle.Stages[waitCancelledStageIDIndex] + waitStopIfSchema := assert.MapContainsKey(t, "stop_if", waitStageIDCancelled.InputSchema) + if waitStopIfSchema.Disabled { + t.Fatalf("step wait's wait_for schema is disabled when the cancel signal is present.") + } + + helloLifecycle, err := runnable.Lifecycle(map[string]any{"step": "hello"}) + assert.NoError(t, err) + // Verify that the expected lifecycle stage is there, then verify that cancel is disabled. + helloCancelledStageIDIndex := assert.SliceContainsExtractor(t, + func(schema step.LifecycleStageWithSchema) string { + return schema.ID + }, string(plugin.StageIDCancelled), helloLifecycle.Stages) + helloStageIDCancelled := helloLifecycle.Stages[helloCancelledStageIDIndex] + helloStopIfSchema := assert.MapContainsKey(t, "stop_if", helloStageIDCancelled.InputSchema) + if !helloStopIfSchema.Disabled { + t.Fatalf("step hello's stop_if schema is not disabled when the cancel signal is not present.") + } +} + func TestProvider_DeployFail(t *testing.T) { logConfig := log.Config{ Level: log.LevelError, @@ -289,8 +350,8 @@ func TestProvider_DeployFail(t *testing.T) { message: make(chan string), } - // default step id - running, err := runnable.Start(map[string]any{"step": nil}, handler) + // wait step + running, err := runnable.Start(map[string]any{"step": "wait"}, t.Name(), handler) assert.NoError(t, err) assert.NoError(t, running.ProvideStageInput( @@ -353,7 +414,7 @@ func TestProvider_StartFail(t *testing.T) { message: make(chan string), } - running, err := runnable.Start(map[string]any{"step": "wait"}, handler) + running, err := runnable.Start(map[string]any{"step": "wait"}, t.Name(), handler) assert.NoError(t, err) // tell deployer that this run should not succeed diff --git a/internal/step/provider.go b/internal/step/provider.go index 3e173675..b8722486 100644 --- a/internal/step/provider.go +++ b/internal/step/provider.go @@ -2,6 +2,7 @@ package step import ( "go.flow.arcalot.io/pluginsdk/schema" + "sync" ) // Provider is the description of an item that fits in a workflow. Its implementation provide the @@ -42,7 +43,8 @@ type StageChangeHandler interface { previousStageOutputID *string, previousStageOutput *any, newStage string, - waitingForInput bool, + inputAvailable bool, + wg *sync.WaitGroup, ) // OnStepComplete is called when the step has completed a final stage in its lifecycle and communicates the output. @@ -52,6 +54,7 @@ type StageChangeHandler interface { previousStage string, previousStageOutputID *string, previousStageOutput *any, + wg *sync.WaitGroup, ) } @@ -67,6 +70,7 @@ type RunnableStep interface { // match the RunSchema. Start( input map[string]any, + runID string, stageChangeHandler StageChangeHandler, ) (RunningStep, error) } @@ -81,7 +85,7 @@ const ( RunningStepStateWaitingForInput RunningStepState = "waiting_for_input" // RunningStepStateRunning indicates that the step is working. RunningStepStateRunning RunningStepState = "running" - // RunningStepStateFinished indicates that the step has finished. + // RunningStepStateFinished indicates that the step has finished, including failure cases. RunningStepStateFinished RunningStepState = "finished" ) @@ -97,4 +101,6 @@ type RunningStep interface { State() RunningStepState // Close shuts down the step and cleans up the resources associated with the step. Close() error + // ForceClose shuts down the step forcefully. + ForceClose() error } diff --git a/workflow/any.go b/workflow/any.go index 14000d6e..0040fa3c 100644 --- a/workflow/any.go +++ b/workflow/any.go @@ -33,6 +33,15 @@ func (a *anySchemaWithExpressions) Validate(data any) error { return err } +func (a *anySchemaWithExpressions) ValidateCompatibility(dataOrType any) error { + // If expression, resolve it before calling ValidateCompatibility + if _, ok := dataOrType.(expressions.Expression); ok { + // Assume okay + return nil + } + return a.anySchema.ValidateCompatibility(dataOrType) +} + func (a *anySchemaWithExpressions) Serialize(data any) (any, error) { return a.checkAndConvert(data) } diff --git a/workflow/executor.go b/workflow/executor.go index 070afe51..f1a4add4 100644 --- a/workflow/executor.go +++ b/workflow/executor.go @@ -30,7 +30,7 @@ func NewExecutor( return nil, fmt.Errorf("bug: no step registry passed to NewExecutor") } return &executor{ - logger: logger, + logger: logger.WithLabel("source", "executor"), stepRegistry: stepRegistry, config: config, }, nil @@ -122,7 +122,15 @@ func (e *executor) Prepare(workflow *Workflow, workflowContext map[string][]byte return nil, err } - // Stage 5: The output data model + // Stage 5: Verify stage inputs + // Now that the output properties are here and the internal data model is here, it should be possible to loop + // through them again to verify that all inputs are valid. So verify that all required inputs are present, schemas + // are valid, etc. + // Do this by looping through the steps' inputs, then verifying that the dag can provide them. + if err := e.verifyWorkflowStageInputs(workflow, workflowContext, stepLifecycles, dag, internalDataModel); err != nil { + return nil, err + } + // Stage 6: The output data model //goland:noinspection GoDeprecation if workflow.Output != nil { if len(workflow.Outputs) > 0 { @@ -277,6 +285,7 @@ func (e *executor) processSteps( nil, ) } + return runnableSteps, stepOutputProperties, stepLifecycles, stepRunData, nil } @@ -323,6 +332,164 @@ func (e *executor) connectStepDependencies( return nil } +// verifyWorkflowStageInputs verifies the schemas of the step inputs. +func (e *executor) verifyWorkflowStageInputs( + workflow *Workflow, + workflowContext map[string][]byte, + stepLifecycles map[string]step.Lifecycle[step.LifecycleStageWithSchema], + dag dgraph.DirectedGraph[*DAGItem], + internalDataModel *schema.ScopeSchema, +) error { + // Loop through all the steps in the engine + for stepID /*stepData is the unused key*/ := range workflow.Steps { + // Then loop through the stages of that step + lifecycle := stepLifecycles[stepID] + for _, stage := range lifecycle.Stages { + err := e.verifyStageInputs(dag, stepID, stage, workflowContext, internalDataModel) + if err != nil { + return err + } + } + } + return nil +} + +func (e *executor) verifyStageInputs( + dag dgraph.DirectedGraph[*DAGItem], + stepID string, + stage step.LifecycleStageWithSchema, + workflowContext map[string][]byte, + internalDataModel *schema.ScopeSchema, +) error { + // First, get the parsed inputs of the stage + parsedInputs, err := e.getStageInputs(dag, stepID, stage) + if err != nil { + return err + } + // Next, loop through the input schema fields. + for name, stageInputSchema := range stage.InputSchema { + providedInputForField := parsedInputs[name] + // Check if the field is present in the stage data. + // If it is NOT present and is NOT required, continue to next field. + // If it is NOT present and IS required, fail + // If it IS present, verify whether schema is compatible with the schema of the provided data, + // then notify the provider that the data is present. + // This is running pre-workflow run, so you can check the schemas, but most fields won't be able to be + // resolved to an actual value. + if providedInputForField == nil { + // not present + if stageInputSchema.RequiredValue { + return fmt.Errorf("required input %s of type %s not found for step %s", + name, stageInputSchema.TypeID(), stepID) + } + } else { + // It is present, so make sure it is compatible. + err := e.preValidateCompatibility(internalDataModel, providedInputForField, stageInputSchema, workflowContext) + if err != nil { + return fmt.Errorf("input validation failed for workflow step %s stage %s (%w)", stepID, stage.ID, err) + } + } + } + return nil +} + +func (e *executor) getStageInputs( + dag dgraph.DirectedGraph[*DAGItem], + stepID string, + stage step.LifecycleStageWithSchema, +) (map[string]any, error) { + currentStageNode, err := dag.GetNodeByID(GetStageNodeID(stepID, stage.ID)) + if err != nil { + return nil, fmt.Errorf("bug: node for current stage not found (%w)", err) + } + // stageData provides the info needed for this node, without the expressions resolved. + stageData := currentStageNode.Item().Data + + // Use reflection to convert the stage's input data to a readable map. + parsedInputs := make(map[string]any) + if stageData != nil { + v := reflect.ValueOf(stageData) + if v.Kind() != reflect.Map { + return nil, fmt.Errorf("could not validate input. Stage data is not a map. It is %s", v.Kind()) + } + + for _, reflectedKey := range v.MapKeys() { + if reflectedKey.Kind() != reflect.Interface { + return nil, fmt.Errorf("expected input key to be interface of a string. Got %s", reflectedKey.Kind()) + } + // Now convert interface to string + key, ok := reflectedKey.Interface().(string) + if !ok { + return nil, fmt.Errorf("error converting input key to string") + } + value := v.MapIndex(reflectedKey).Interface() + parsedInputs[key] = value + } + } + return parsedInputs, nil +} + +func (e *executor) preValidateCompatibility(rootSchema schema.Scope, inputField any, propertySchema *schema.PropertySchema, + workflowContext map[string][]byte) error { + // Get the type/value structure + inputTypeStructure, err := e.createTypeStructure(rootSchema, inputField, workflowContext) + if err != nil { + return err + } + // Now validate + return propertySchema.ValidateCompatibility(inputTypeStructure) +} + +// createTypeStructure generates a structure of all the type information of the input field. +// When the literal is known, it includes the original value. +// When the literal is not known, but the schema is, it includes the value. +// When it encounters a map or list, it preserves it and recursively continues. +func (e *executor) createTypeStructure(rootSchema schema.Scope, inputField any, workflowContext map[string][]byte) (any, error) { + + // Expression, so the exact value may not be known yet. So just get the type from it. + if expr, ok := inputField.(expressions.Expression); ok { + // Is expression, so evaluate it. + e.logger.Debugf("Evaluating expression %s...", expr.String()) + return expr.Type(rootSchema, workflowContext) + } + + v := reflect.ValueOf(inputField) + switch v.Kind() { + case reflect.Slice: + // Okay. Construct the list of schemas, and pass it into the + + result := make([]any, v.Len()) + for i := 0; i < v.Len(); i++ { + value := v.Index(i).Interface() + newValue, err := e.createTypeStructure(rootSchema, value, workflowContext) + if err != nil { + return nil, fmt.Errorf("failed to resolve expressions (%w)", err) + } + result[i] = newValue + } + return result, nil + case reflect.Map: + result := make(map[string]any, v.Len()) + for _, reflectedKey := range v.MapKeys() { + key := reflectedKey.Interface() + keyAsStr, ok := key.(string) + if !ok { + return nil, fmt.Errorf("failed to generate type structure. Key is not of type string") + } + value := v.MapIndex(reflectedKey).Interface() + newValue, err := e.createTypeStructure(rootSchema, value, workflowContext) + if err != nil { + return nil, fmt.Errorf("failed to resolve expressions (%w)", err) + } + result[keyAsStr] = newValue + } + return result, nil + default: + // Not an expression, so it's actually data. Just return the input + return inputField, nil + } +} + // buildInternalDataModel builds an internal data model that the expressions can query. func (e *executor) buildInternalDataModel(input schema.Scope, stepOutputProperties map[string]*schema.PropertySchema) *schema.ScopeSchema { internalDataModel := schema.NewScopeSchema( diff --git a/workflow/executor_test.go b/workflow/executor_test.go index cec272dd..80fae2c4 100644 --- a/workflow/executor_test.go +++ b/workflow/executor_test.go @@ -3,16 +3,76 @@ package workflow_test import ( "context" "fmt" + "go.arcalot.io/assert" + "go.flow.arcalot.io/deployer" + deployerregistry "go.flow.arcalot.io/deployer/registry" "go.flow.arcalot.io/engine/config" + "go.flow.arcalot.io/engine/internal/step" + "go.flow.arcalot.io/engine/internal/step/plugin" + testimpl "go.flow.arcalot.io/testdeployer" "testing" "go.arcalot.io/lang" "go.arcalot.io/log/v2" "go.flow.arcalot.io/engine/internal/step/dummy" - "go.flow.arcalot.io/engine/internal/step/registry" + stepregistry "go.flow.arcalot.io/engine/internal/step/registry" "go.flow.arcalot.io/engine/workflow" ) +func getTestImplPreparedWorkflow(t *testing.T, workflowDefinition string) (workflow.ExecutableWorkflow, error) { + logger := log.NewLogger(log.LevelDebug, log.NewTestWriter(t)) + cfg := &config.Config{ + LoggedOutputConfigs: map[string]*config.StepOutputLogConfig{ + "terminated_early": { + LogLevel: log.LevelError, + }, + }, + } + stepRegistry := NewTestImplStepRegistry(logger, t) + + executor := lang.Must2(workflow.NewExecutor( + logger, + cfg, + stepRegistry, + )) + wf := assert.NoErrorR[*workflow.Workflow](t)(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(workflowDefinition))) + return executor.Prepare(wf, map[string][]byte{}) +} + +func getDummyDeployerPreparedWorkflow(t *testing.T, workflowDefinition string) (workflow.ExecutableWorkflow, error) { + logger := log.NewLogger(log.LevelDebug, log.NewTestWriter(t)) + cfg := &config.Config{} + stepRegistry := assert.NoErrorR[step.Registry](t)(stepregistry.New( + dummy.New(), + )) + executor := assert.NoErrorR[workflow.Executor](t)(workflow.NewExecutor( + logger, + cfg, + stepRegistry, + )) + wf := assert.NoErrorR[*workflow.Workflow](t)(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(workflowDefinition))) + return executor.Prepare(wf, map[string][]byte{}) +} + +func NewTestImplStepRegistry( + logger log.Logger, + t *testing.T, +) step.Registry { + deployerRegistry := deployerregistry.New( + deployer.Any(testimpl.NewFactory()), + ) + + pluginProvider := assert.NoErrorR[step.Provider](t)( + plugin.New(logger, deployerRegistry, map[string]interface{}{ + "type": "test-impl", + "deploy_time": "0", + }), + ) + return assert.NoErrorR[step.Registry](t)(stepregistry.New( + pluginProvider, + )) +} + var sharedInputWorkflowYAML = `--- version: v0.1.0 input: @@ -27,6 +87,7 @@ input: steps: say_hi: kind: dummy + # Both name and nickname reference the same variable name: !expr $.input.name nickname: !expr $.input.name output: @@ -38,27 +99,9 @@ output: // and one step-output going into two step-outputs. // These cause duplicate connections to be made, which need to be handled properly. func TestSharedInput(t *testing.T) { - logger := log.NewLogger(log.LevelDebug, log.NewTestWriter(t)) - stepRegistry := lang.Must2(registry.New( - dummy.New(), - )) - - executor, err := workflow.NewExecutor(logger, &config.Config{}, stepRegistry) - if err != nil { - t.Fatalf("Failed to create Executor, %e", err) - } - - yamlConverter := workflow.NewYAMLConverter(stepRegistry) - decodedWorkflow, err := yamlConverter.FromYAML([]byte(sharedInputWorkflowYAML)) - if err != nil { - t.Fatalf("Failed to load workflow from YAML, %e", err) - } - - preparedWorkflow, err := executor.Prepare(decodedWorkflow, nil) - if err != nil { - t.Fatalf("Failed to prepare workflow, %e", err) - } - + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getDummyDeployerPreparedWorkflow(t, sharedInputWorkflowYAML), + ) ctx, cancel := context.WithCancel(context.Background()) defer cancel() outputID, outputData, err := preparedWorkflow.Execute(ctx, map[string]any{ @@ -70,3 +113,109 @@ func TestSharedInput(t *testing.T) { fmt.Printf("%s: %s\n", outputID, outputData.(map[any]any)["message"]) // Output: success: Hello Arca Lot! } + +var missingInputWorkflowDefinition1 = ` +version: v0.1.0 +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: {} +steps: + incomplete_wait: + plugin: "n/a" + step: wait + # Missing input +outputs: + a: + b: !expr $.steps.incomplete_wait.outputs +` + +var missingInputWorkflowDefinition2 = ` +version: v0.1.0 +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: {} +steps: + say_hi: + kind: dummy + # Missing name +outputs: + a: + b: !expr $.steps.say_hi.greet +` + +func TestMissingInput(t *testing.T) { + // For this test, a workflow's step will be missing its inputs. + _, err := getTestImplPreparedWorkflow(t, missingInputWorkflowDefinition1) + assert.Error(t, err) + + _, err = getDummyDeployerPreparedWorkflow(t, missingInputWorkflowDefinition2) + assert.Error(t, err) +} + +var mismatchedStepInputTypesWorkflowDefinition = ` +version: v0.1.0 +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: {} +steps: + wait_1: + plugin: "n/a" + step: wait + input: + wait_time_ms: 0 + wait_2: + plugin: "n/a" + step: wait + input: + # Should fail during preparation, due to message being a string, and wait_time_ms expecting an int + wait_time_ms: !expr $.steps.wait_1.outputs.success.message +outputs: + a: + b: !expr $.steps.wait_2.outputs +` + +func TestMismatchedStepInputTypes(t *testing.T) { + _, err := getTestImplPreparedWorkflow(t, mismatchedStepInputTypesWorkflowDefinition) + assert.Error(t, err) +} + +var mismatchedInputTypesWorkflowDefinition = ` +version: v0.1.0 +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: + a: + display: + description: "Just for testing" + name: "a" + required: true + type: + type_id: string +steps: + wait_1: + plugin: "n/a" + step: wait + input: + # This is trying to put a string into an int field + wait_time_ms: !expr $.input.a +outputs: + a: + b: !expr $.steps.wait_1.outputs +` + +func TestMismatchedInputTypes(t *testing.T) { + _, err := getTestImplPreparedWorkflow(t, mismatchedInputTypesWorkflowDefinition) + assert.Error(t, err) +} diff --git a/workflow/model.go b/workflow/model.go index f5ff010c..5ff2b02a 100644 --- a/workflow/model.go +++ b/workflow/model.go @@ -229,7 +229,7 @@ type ErrNoMorePossibleSteps struct { // Error returns an explanation on why the error happened. func (e ErrNoMorePossibleSteps) Error() string { - var outputs []string //nolint:prealloc + var outputsUnmetDependencies []string //nolint:prealloc for _, node := range e.dag.ListNodes() { if node.Item().Kind != DAGItemKindOutput { continue @@ -242,7 +242,24 @@ func (e ErrNoMorePossibleSteps) Error() string { for i := range inbound { unmetDependencies = append(unmetDependencies, i) } - outputs = append(outputs, fmt.Sprintf("%s: %s", node.Item().OutputID, strings.Join(unmetDependencies, ", "))) + outputsUnmetDependencies = append( + outputsUnmetDependencies, + fmt.Sprintf("%s: %s", node.Item().OutputID, strings.Join(unmetDependencies, ", ")), + ) } - return fmt.Sprintf("no steps running, no more executable steps, cannot construct any output (outputs have the following dependencies: %s)", strings.Join(outputs, "; ")) + return fmt.Sprintf( + "no steps running, no more executable steps, cannot construct any output (outputs have the following dependencies: %s)", + strings.Join(outputsUnmetDependencies, "; "), + ) +} + +// ErrInvalidState indicates that the workflow failed due to an invalid state. +type ErrInvalidState struct { + processingSteps int + msg string +} + +// Error returns an explanation on why the error happened. +func (e ErrInvalidState) Error() string { + return fmt.Sprintf("Workflow failed due to invalid state (%s). Processing steps: %d", e.msg, e.processingSteps) } diff --git a/workflow/workflow.go b/workflow/workflow.go index 044f9cb2..9ffab54d 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -6,6 +6,7 @@ import ( "fmt" "reflect" "sync" + "time" "go.flow.arcalot.io/engine/config" @@ -59,7 +60,7 @@ func (e *executableWorkflow) Execute(ctx context.Context, input any) (outputID s defer cancel() l := &loopState{ - logger: e.logger, + logger: e.logger.WithLabel("source", "workflow"), config: e.config, lock: &sync.Mutex{}, data: map[string]any{ @@ -73,8 +74,11 @@ func (e *executableWorkflow) Execute(ctx context.Context, input any) (outputID s outputDone: false, cancel: cancel, workflowContext: e.workflowContext, + recentErrors: make(chan error, 20), // Big buffer in case there are lots of subsequent errors. } + l.lock.Lock() + // Iterate over all steps to set them up with proper handlers, then launch them. // Even though they're launched, the workflow won't execute until the input is provided. for stepID, runnableStep := range e.runnableSteps { @@ -92,36 +96,52 @@ func (e *executableWorkflow) Execute(ctx context.Context, input any) (outputID s l.data["steps"].(map[string]any)[stepID] = stepDataModel var stageHandler step.StageChangeHandler = &stageChangeHandler{ - onStageChange: func(step step.RunningStep, previousStage *string, previousStageOutputID *string, previousStageOutput *any, stage string, waitingForInput bool) { + onStageChange: func( + step step.RunningStep, + previousStage *string, + previousStageOutputID *string, + previousStageOutput *any, + stage string, + inputAvailable bool, + wg *sync.WaitGroup, + ) { waitingForInputText := "" - if waitingForInput { + if !inputAvailable { waitingForInputText = " and is waiting for input" } - e.logger.Debugf("Stage change for step %s to %s%s...", stepID, stage, waitingForInputText) - l.onStageComplete(stepID, previousStage, previousStageOutputID, previousStageOutput) + e.logger.Debugf("START Stage change for step %s to %s%s...", stepID, stage, waitingForInputText) + l.onStageComplete(stepID, previousStage, previousStageOutputID, previousStageOutput, wg) + e.logger.Debugf("DONE Stage change for step %s to %s%s...", stepID, stage, waitingForInputText) }, - onStepComplete: func(step step.RunningStep, previousStage string, previousStageOutputID *string, previousStageOutput *any) { + onStepComplete: func( + step step.RunningStep, + previousStage string, + previousStageOutputID *string, + previousStageOutput *any, + wg *sync.WaitGroup, + ) { if previousStageOutputID != nil { e.logger.Debugf("Step %s completed with stage '%s', output '%s'...", stepID, previousStage, *previousStageOutputID) } else { e.logger.Debugf("Step %s completed with stage '%s'...", stepID, previousStage) } - l.onStageComplete(stepID, &previousStage, previousStageOutputID, previousStageOutput) + l.onStageComplete(stepID, &previousStage, previousStageOutputID, previousStageOutput, wg) }, } e.logger.Debugf("Launching step %s...", stepID) - runningStep, err := runnableStep.Start(e.stepRunData[stepID], stageHandler) + runningStep, err := runnableStep.Start(e.stepRunData[stepID], stepID, stageHandler) if err != nil { return "", nil, fmt.Errorf("failed to launch step %s (%w)", stepID, err) } l.runningSteps[stepID] = runningStep } + l.lock.Unlock() // Let's make sure we are closing all steps once this function terminates so we don't leave stuff running. defer func() { e.logger.Debugf("Terminating all steps...") for stepID, runningStep := range l.runningSteps { e.logger.Debugf("Terminating step %s...", stepID) - if err := runningStep.Close(); err != nil { + if err := runningStep.ForceClose(); err != nil { panic(fmt.Errorf("failed to close step %s (%w)", stepID, err)) } } @@ -171,9 +191,10 @@ func (e *executableWorkflow) Execute(ctx context.Context, input any) (outputID s } return outputDataEntry.outputID, outputData, nil case <-ctx.Done(): - e.logger.Debugf("Workflow execution aborted. %s", l.lastError) - if l.lastError != nil { - return "", nil, l.lastError + lastErr := l.getLastError() + e.logger.Debugf("Workflow execution aborted. %s", lastErr) + if lastErr != nil { + return "", nil, lastErr } return "", nil, fmt.Errorf("workflow execution aborted (%w)", ctx.Err()) } @@ -197,14 +218,42 @@ type loopState struct { runningSteps map[string]step.RunningStep outputDataChannel chan outputDataType outputDone bool - lastError error + recentErrors chan error cancel context.CancelFunc workflowContext map[string][]byte } -func (l *loopState) onStageComplete(stepID string, previousStage *string, previousStageOutputID *string, previousStageOutput *any) { +// getLastError gathers the last errors. If there are several, it creates a new one that consolidates them. +// This will read from the channel. Calling again will only gather new errors since the last call. +func (l *loopState) getLastError() error { + var errors []error +errGatherLoop: + for { + select { + case err := <-l.recentErrors: + errors = append(errors, err) + default: + break errGatherLoop // No more errors + } + } + switch len(errors) { + case 0: + return nil + case 1: + return errors[0] + default: + return fmt.Errorf("multiple errors: %v", errors) + } +} + +func (l *loopState) onStageComplete(stepID string, previousStage *string, previousStageOutputID *string, previousStageOutput *any, wg *sync.WaitGroup) { l.lock.Lock() - defer l.lock.Unlock() + defer func() { + if previousStage != nil { + l.checkForDeadlocks(3, wg) + } + l.lock.Unlock() + }() if previousStage == nil { return @@ -212,13 +261,14 @@ func (l *loopState) onStageComplete(stepID string, previousStage *string, previo stageNode, err := l.dag.GetNodeByID(GetStageNodeID(stepID, *previousStage)) if err != nil { l.logger.Errorf("Failed to get stage node ID %s (%w)", GetStageNodeID(stepID, *previousStage), err) - l.lastError = fmt.Errorf("failed to get stage node ID %s (%w)", GetStageNodeID(stepID, *previousStage), err) + l.recentErrors <- fmt.Errorf("failed to get stage node ID %s (%w)", GetStageNodeID(stepID, *previousStage), err) l.cancel() return } + l.logger.Debugf("Removed node '%s' from the DAG", stageNode.ID()) if err := stageNode.Remove(); err != nil { l.logger.Errorf("Failed to remove stage node ID %s (%w)", stageNode.ID(), err) - l.lastError = fmt.Errorf("failed to remove stage node ID %s (%w)", stageNode.ID(), err) + l.recentErrors <- fmt.Errorf("failed to remove stage node ID %s (%w)", stageNode.ID(), err) l.cancel() return } @@ -226,13 +276,15 @@ func (l *loopState) onStageComplete(stepID string, previousStage *string, previo outputNode, err := l.dag.GetNodeByID(GetOutputNodeID(stepID, *previousStage, *previousStageOutputID)) if err != nil { l.logger.Errorf("Failed to get output node ID %s (%w)", GetStageNodeID(stepID, *previousStage), err) - l.lastError = fmt.Errorf("failed to get output node ID %s (%w)", GetStageNodeID(stepID, *previousStage), err) + l.recentErrors <- fmt.Errorf("failed to get output node ID %s (%w)", GetStageNodeID(stepID, *previousStage), err) l.cancel() return } + // Removes the node from the DAG. This results in the nodes not having inbound connections, allowing them to be processed. + l.logger.Debugf("Removed node '%s' from the DAG", outputNode.ID()) if err := outputNode.Remove(); err != nil { l.logger.Errorf("Failed to remove output node ID %s (%w)", outputNode.ID(), err) - l.lastError = fmt.Errorf("failed to remove output node ID %s (%w)", outputNode.ID(), err) + l.recentErrors <- fmt.Errorf("failed to remove output node ID %s (%w)", outputNode.ID(), err) l.cancel() return } @@ -257,7 +309,7 @@ func (l *loopState) onStageComplete(stepID string, previousStage *string, previo // notifySteps is a function we can call to go through all DAG nodes that have no inbound connections and // provide step inputs based on expressions. -// The lock should be acquired when this is called. +// The lock should be acquired by the caller before this is called. func (l *loopState) notifySteps() { //nolint:gocognit // This function goes through the DAG and feeds the input to all steps that have no further inbound // dependencies. @@ -302,33 +354,33 @@ func (l *loopState) notifySteps() { //nolint:gocognit // Tries to match the schema if _, err := node.Item().DataSchema.Unserialize(untypedInputData); err != nil { l.logger.Errorf("Bug: schema evaluation resulted in invalid data for %s (%v)", node.ID(), err) - l.lastError = fmt.Errorf("bug: schema evaluation resulted in invalid data for %s (%w)", node.ID(), err) + l.recentErrors <- fmt.Errorf("bug: schema evaluation resulted in invalid data for %s (%w)", node.ID(), err) l.cancel() return } // This check is here just to make sure it has the required fields set - if node.Item().StepID != "" && node.Item().StageID != "" { - stageInputData := untypedInputData.(map[any]any) - typedInputData := make(map[string]any, len(stageInputData)) - for k, v := range stageInputData { - typedInputData[k.(string)] = v - } - // Sends it to the plugin - l.logger.Debugf("Providing stage input for %s...", nodeID) - if err := l.runningSteps[node.Item().StepID].ProvideStageInput( - node.Item().StageID, - typedInputData, - ); err != nil { - l.logger.Errorf("Bug: failed to provide input to step %s (%w)", node.Item().StepID, err) - l.lastError = fmt.Errorf("bug: failed to provide input to step %s (%w)", node.Item().StepID, err) - l.cancel() - return - } - } else { + if node.Item().StepID == "" || node.Item().StageID == "" { // This shouldn't happen panic("Step or stage ID missing") } + + stageInputData := untypedInputData.(map[any]any) + typedInputData := make(map[string]any, len(stageInputData)) + for k, v := range stageInputData { + typedInputData[k.(string)] = v + } + // Sends it to the plugin + l.logger.Debugf("Providing stage input for %s...", nodeID) + if err := l.runningSteps[node.Item().StepID].ProvideStageInput( + node.Item().StageID, + typedInputData, + ); err != nil { + l.logger.Errorf("Bug: failed to provide input to step %s (%w)", node.Item().StepID, err) + l.recentErrors <- fmt.Errorf("bug: failed to provide input to step %s (%w)", node.Item().StepID, err) + l.cancel() + return + } case DAGItemKindOutput: // We have received enough data to construct the workflow output. l.logger.Debugf("Constructing workflow output.") @@ -354,17 +406,29 @@ func (l *loopState) notifySteps() { //nolint:gocognit } } } - // Here we make sure we don't have a deadlock. +} + +type stateCounters struct { + starting int + waitingWithInbound int + waitingWithoutInbound int + running int + finished int +} + +func (l *loopState) countStates() stateCounters { counters := struct { - starting int - waiting int - running int - finished int + starting int + waitingWithInbound int + waitingWithoutInbound int + running int + finished int }{ 0, 0, 0, 0, + 0, } for stepID, runningStep := range l.runningSteps { switch runningStep.State() { @@ -372,20 +436,25 @@ func (l *loopState) notifySteps() { //nolint:gocognit counters.starting++ l.logger.Debugf("Step %s is currently starting.", stepID) case step.RunningStepStateWaitingForInput: - counters.waiting++ - connectionsMsg := "" dagNode, err := l.dag.GetNodeByID(GetStageNodeID(stepID, runningStep.CurrentStage())) switch { case err != nil: l.logger.Warningf("Failed to get DAG node for the debug message (%w)", err) + counters.waitingWithInbound++ case dagNode == nil: l.logger.Warningf("Failed to get DAG node for the debug message. Returned nil", err) + counters.waitingWithInbound++ default: inboundConnections, err := dagNode.ListInboundConnections() if err != nil { l.logger.Warningf("Error while listing inbound connections. (%w)", err) } + if len(inboundConnections) > 0 { + counters.waitingWithInbound++ + } else { + counters.waitingWithoutInbound++ + } i := 0 for k := range inboundConnections { @@ -405,20 +474,38 @@ func (l *loopState) notifySteps() { //nolint:gocognit l.logger.Debugf("Step %s is currently finished.", stepID) } } + return counters +} + +func (l *loopState) checkForDeadlocks(retries int, wg *sync.WaitGroup) { + // Here we make sure we don't have a deadlock. + counters := l.countStates() l.logger.Infof( - "There are currently %d steps starting, %d waiting for input, %d running, %d finished", + "There are currently %d steps starting, %d waiting for input, %d ready for input, %d running, %d finished", counters.starting, - counters.waiting, + counters.waitingWithInbound, + counters.waitingWithoutInbound, counters.running, counters.finished, ) - if counters.starting == 0 && counters.running == 0 && !l.outputDone { - l.lastError = &ErrNoMorePossibleSteps{ - l.dag, + if counters.starting == 0 && counters.running == 0 && counters.waitingWithoutInbound == 0 && !l.outputDone { + if retries <= 0 { + l.recentErrors <- &ErrNoMorePossibleSteps{ + l.dag, + } + l.logger.Debugf("DAG:\n%s", l.dag.Mermaid()) + l.cancel() + } else { + // Retry. There are times when all the steps are in a transition state. + // Retrying will delay the check until after they are done with the transition. + l.logger.Warningf("No running steps. Rechecking...") + wg.Add(1) + go func() { + time.Sleep(5 * time.Millisecond) + l.checkForDeadlocks(retries-1, wg) + wg.Done() + }() } - l.logger.Errorf("%v", l.lastError) - l.logger.Debugf("DAG:\n%s", l.dag.Mermaid()) - l.cancel() } } @@ -459,15 +546,44 @@ func (l *loopState) resolveExpressions(inputData any, dataModel any) (any, error } } +// stageChangeHandler is implementing step.StageChangeHandler. type stageChangeHandler struct { - onStageChange func(step step.RunningStep, previousStage *string, previousStageOutputID *string, previousStageOutput *any, stage string, waitingForInput bool) - onStepComplete func(step step.RunningStep, previousStage string, previousStageOutputID *string, previousStageOutput *any) + onStageChange func( + step step.RunningStep, + previousStage *string, + previousStageOutputID *string, + previousStageOutput *any, + stage string, + waitingForInput bool, + wg *sync.WaitGroup, + ) + onStepComplete func( + step step.RunningStep, + previousStage string, + previousStageOutputID *string, + previousStageOutput *any, + wg *sync.WaitGroup, + ) } -func (s stageChangeHandler) OnStageChange(step step.RunningStep, previousStage *string, previousStageOutputID *string, previousStageOutput *any, stage string, waitingForInput bool) { - s.onStageChange(step, previousStage, previousStageOutputID, previousStageOutput, stage, waitingForInput) +func (s stageChangeHandler) OnStageChange( + step step.RunningStep, + previousStage *string, + previousStageOutputID *string, + previousStageOutput *any, + stage string, + waitingForInput bool, + wg *sync.WaitGroup, +) { + s.onStageChange(step, previousStage, previousStageOutputID, previousStageOutput, stage, waitingForInput, wg) } -func (s stageChangeHandler) OnStepComplete(step step.RunningStep, previousStage string, previousStageOutputID *string, previousStageOutput *any) { - s.onStepComplete(step, previousStage, previousStageOutputID, previousStageOutput) +func (s stageChangeHandler) OnStepComplete( + step step.RunningStep, + previousStage string, + previousStageOutputID *string, + previousStageOutput *any, + wg *sync.WaitGroup, +) { + s.onStepComplete(step, previousStage, previousStageOutputID, previousStageOutput, wg) } diff --git a/workflow/workflow_test.go b/workflow/workflow_test.go index 7faff9de..f7f56254 100644 --- a/workflow/workflow_test.go +++ b/workflow/workflow_test.go @@ -3,21 +3,13 @@ package workflow_test import ( "context" "errors" - "testing" - "time" - - "go.flow.arcalot.io/deployer" - "go.flow.arcalot.io/engine/internal/step" - "go.flow.arcalot.io/engine/internal/step/plugin" - testimpl "go.flow.arcalot.io/testdeployer" - "go.arcalot.io/assert" "go.arcalot.io/lang" "go.arcalot.io/log/v2" - deployerregistry "go.flow.arcalot.io/deployer/registry" "go.flow.arcalot.io/engine/config" - "go.flow.arcalot.io/engine/internal/step/dummy" - stepregistry "go.flow.arcalot.io/engine/internal/step/registry" + "testing" + "time" + "go.flow.arcalot.io/engine/workflow" ) @@ -41,26 +33,9 @@ output: ` func TestOutputFailed(t *testing.T) { - logConfig := log.Config{ - Level: log.LevelError, - Destination: log.DestinationStdout, - } - logger := log.New( - logConfig, + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getDummyDeployerPreparedWorkflow(t, badWorkflowDefinition), ) - cfg := &config.Config{ - Log: logConfig, - } - stepRegistry := lang.Must2(stepregistry.New( - dummy.New(), - )) - executor := lang.Must2(workflow.NewExecutor( - logger, - cfg, - stepRegistry, - )) - wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(badWorkflowDefinition))) - preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{})) _, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{"name": "Arca Lot"}) assert.Nil(t, outputData) assert.Error(t, err) @@ -89,64 +64,186 @@ steps: plugin: "n/a" step: wait input: - wait_time_ms: 0 + # It needs to be long enough for it to ensure that long_wait is in a running state. + # The other case will be tested separately. + wait_time_ms: 20 outputs: a: cancelled_step_output: !expr $.steps.long_wait.outputs ` -func NewTestImplStepRegistry( - logger log.Logger, - t *testing.T, -) step.Registry { - deployerRegistry := deployerregistry.New( - deployer.Any(testimpl.NewFactory()), +func TestStepCancellation(t *testing.T) { + // For this test, a simple workflow will run wait steps, with one that's + // supposed to be stopped when the first stops. + // The long one will be long enough that there is no reasonable way + // for it to finish before the first step. + // The test double deployer will be used for this test, as we + // need a deployer to test the plugin step provider. + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getTestImplPreparedWorkflow(t, stepCancellationWorkflowDefinition), ) + outputID, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) + assert.NoError(t, err) + assert.Equals(t, outputID, "a") + stepResult := assert.MapContainsKeyAny(t, "cancelled_step_output", outputData.(map[any]any)) + assert.MapContainsKey(t, "cancelled_early", stepResult.(map[string]any)) +} + +var earlyStepCancellationWorkflowDefinition = ` +version: v0.1.0 +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: {} +steps: + # This one needs to run longer than the total time expected of all the other steps, with + # a large enough difference to prevent timing errors breaking the test. + end_wait: + plugin: "n/a" + step: wait + input: + wait_time_ms: 80 + # Delay needs to be delayed long enough to ensure that last_step isn't running when it's cancelled by short_wait + delay: + plugin: "n/a" + step: wait + input: + wait_time_ms: 50 + last_step: + plugin: "n/a" + step: wait + input: + wait_time_ms: 0 + # Delay it so it doesn't run, and gets cancelled before deployment. + wait_for: !expr $.steps.delay.outputs + # You can verify that this test works by commenting out this line. It should fail. + stop_if: !expr $.steps.short_wait.outputs + short_wait: + plugin: "n/a" + step: wait + input: + # End the test quickly. + wait_time_ms: 0 +outputs: + # If not properly cancelled, fail_case will have output. + fail_case: + unattainable: !expr $.steps.last_step.outputs + correct_case: + a: !expr $.steps.end_wait.outputs +` - pluginProvider := assert.NoErrorR[step.Provider](t)( - plugin.New(logger, deployerRegistry, map[string]interface{}{ - "type": "test-impl", - "deploy_time": "0", - }), +func TestEarlyStepCancellation(t *testing.T) { + // For this test, a simple workflow will run wait steps, with the workflow + // The long one will be long enough that there is no reasonable way + // for it to finish before the first step. + // The test double deployer will be used for this test, as we + // need a deployer to test the plugin step provider. + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getTestImplPreparedWorkflow(t, earlyStepCancellationWorkflowDefinition), ) - return assert.NoErrorR[step.Registry](t)(stepregistry.New( - pluginProvider, - )) + startTime := time.Now() // Right before execute to not include pre-processing time. + outputID, _, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) + duration := time.Since(startTime) + t.Logf("Test execution time: %s", duration) + // A nil value means the output could not be constructed, which is intended due to us cancelling the step it depends on. + // If it's not nil, that means the step didn't get cancelled. + assert.NoError(t, err) + assert.Equals(t, outputID, "correct_case") + // All steps that can result in output are 0 ms, so just leave some time for processing. + assert.LessThan(t, duration.Milliseconds(), 200) } -func TestStepCancellation(t *testing.T) { - // For this test, a simple workflow will run wait steps, with one that's - // supposed to be stopped when the first stops. +var deploymentStepCancellationWorkflowDefinition = ` +version: v0.1.0 +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: {} +steps: + # This one needs to run longer than the total time expected of all the other steps, with + # a large enough difference to prevent timing errors breaking the test. + end_wait: + plugin: "n/a" + step: wait + input: + wait_time_ms: 100 + step_to_cancel: + plugin: "n/a" + step: wait + input: + wait_time_ms: 0 + # You can verify that this test works by commenting out this line. It should fail. + stop_if: !expr $.steps.short_wait.outputs + # Delay needs to be delayed long enough to ensure that it's in a deploy state when it's cancelled by short_wait + deploy: + type: "test-impl" + deploy_time: 50 # 50 ms + short_wait: + plugin: "n/a" + step: wait + input: + # End the test quickly. + wait_time_ms: 0 +outputs: + # If not properly cancelled, fail_case will have output. + fail_case: + unattainable: !expr $.steps.step_to_cancel.outputs + correct_case: + a: !expr $.steps.end_wait.outputs +` + +func TestDeploymentStepCancellation(t *testing.T) { + // For this test, a simple workflow will run wait steps, with the workflow // The long one will be long enough that there is no reasonable way // for it to finish before the first step. // The test double deployer will be used for this test, as we // need a deployer to test the plugin step provider. - logConfig := log.Config{ - Level: log.LevelInfo, - Destination: log.DestinationStdout, - } - logger := log.New( - logConfig, + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getTestImplPreparedWorkflow(t, deploymentStepCancellationWorkflowDefinition), ) - cfg := &config.Config{ - Log: logConfig, - } - stepRegistry := NewTestImplStepRegistry(logger, t) + startTime := time.Now() // Right before execute to not include pre-processing time. + outputID, _, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) + duration := time.Since(startTime) + t.Logf("Test execution time: %s", duration) + // A nil value means the output could not be constructed, which is intended due to us cancelling the step it depends on. + // If it's not nil, that means the step didn't get cancelled. + assert.NoError(t, err) + assert.Equals(t, outputID, "correct_case") + // All steps that can result in output are 0 ms, so just leave some time for processing. + assert.LessThan(t, duration.Milliseconds(), 200) +} - executor := lang.Must2(workflow.NewExecutor( - logger, - cfg, - stepRegistry, - )) - wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(stepCancellationWorkflowDefinition))) - preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{})) - outputID, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) +var simpleValidLiteralInputWaitWorkflowDefinition = ` +version: v0.1.0 +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: {} +steps: + wait_1: + plugin: "n/a" + step: wait + input: + wait_time_ms: 0 +outputs: + a: + b: !expr $.steps.wait_1.outputs +` + +func TestSimpleValidWaitWorkflow(t *testing.T) { + // Just a single wait + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getTestImplPreparedWorkflow(t, simpleValidLiteralInputWaitWorkflowDefinition), + ) + outputID, _, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) assert.NoError(t, err) assert.Equals(t, outputID, "a") - stepResult := outputData.(map[interface{}]interface{})["cancelled_step_output"] - assert.NotNil(t, stepResult) - stepResultCancelledEarly := stepResult.(map[string]interface{})["cancelled_early"] - assert.NotNil(t, stepResultCancelledEarly) } var waitForSerialWorkflowDefinition = ` @@ -162,12 +259,13 @@ steps: plugin: "n/a" step: wait input: - wait_time_ms: 500 + # Note: 5ms left only a 2.5ms margin for error. 10ms left almost 6ms. So 10ms min is recommended. + wait_time_ms: 10 second_wait: plugin: "n/a" step: wait input: - wait_time_ms: 500 + wait_time_ms: 10 wait_for: !expr $.steps.first_wait.outputs.success outputs: success: @@ -183,7 +281,6 @@ func TestWaitForSerial(t *testing.T) { // as each step runs for 5s and are run serially // The test double deployer will be used for this test, as we // need a deployer to test the plugin step provider. - startTime := time.Now() logConfig := log.Config{ Level: log.LevelInfo, Destination: log.DestinationStdout, @@ -203,6 +300,7 @@ func TestWaitForSerial(t *testing.T) { )) wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(waitForSerialWorkflowDefinition))) preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{})) + startTime := time.Now() // Right before execute to not include pre-processing time. outputID, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) assert.NoError(t, err) assert.Equals(t, outputID, "success") @@ -217,20 +315,14 @@ func TestWaitForSerial(t *testing.T) { duration := time.Since(startTime) t.Logf("Test execution time: %s", duration) - var waitSuccess bool - if duration >= 1*time.Second { - waitSuccess = true - t.Logf("Test execution time is greater than 1 second, steps are running serially due to the wait_for condition.") + if duration >= 20*time.Millisecond { + t.Logf("Test execution time is greater than 20 milliseconds; steps are correctly running serially due to the wait_for condition.") } else { - waitSuccess = false - t.Logf("Test execution time is lesser than 1 seconds, steps are not running serially.") + t.Fatalf("Test execution time is less than 20 milliseconds; steps are not running serially.") } - assert.Equals(t, waitSuccess, true) } -// Running parallel steps which wait on the same previous step sometimes causes a race condition. This needs to be investigated. -// once the race condition if fixed reduce the wait_time to 500ms. -var waitForParallelWorkflowDefinition = ` +var missingInputsFailedDeploymentWorkflowDefinition = ` version: v0.1.0 input: root: RootObject @@ -239,74 +331,68 @@ input: id: RootObject properties: {} steps: - first_wait: + wait_1: plugin: "n/a" step: wait input: - wait_time_ms: 5000 - second_wait: + wait_time_ms: 0 + deploy: + type: "test-impl" + #deploy_time: 20000 # 10 ms + deploy_succeed: false + wait_2: plugin: "n/a" step: wait + wait_for: !expr $.steps.wait_1.outputs.success input: - wait_time_ms: 5000 - wait_for: !expr $.steps.first_wait.outputs.success - third_wait: + wait_time_ms: 0 +outputs: + a: + b: !expr $.steps.wait_2.outputs +` + +func TestMissingInputsFailedDeployment(t *testing.T) { + // For this test, the workflow should fail, not deadlock, due to no inputs possible. + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getTestImplPreparedWorkflow(t, missingInputsFailedDeploymentWorkflowDefinition), + ) + outputID, _, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) + assert.Error(t, err) + assert.Equals(t, outputID, "") +} + +var missingInputsWrongOutputWorkflowDefinition = ` +version: v0.1.0 +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: {} +steps: + wait_1: plugin: "n/a" step: wait input: - wait_time_ms: 5000 - wait_for: !expr $.steps.first_wait.outputs.success + wait_time_ms: 0 + wait_2: + plugin: "n/a" + step: wait + # No stop_if, so this shouldn't happen. + wait_for: !expr $.steps.wait_1.outputs.cancelled_early + input: + wait_time_ms: 0 outputs: - success: - third_step_output: !expr $.steps.third_wait.outputs - second_step_output: !expr $.steps.second_wait.outputs + a: + b: !expr $.steps.wait_2.outputs ` -func TestWaitForParallel(t *testing.T) { - // For this test, a workflow runs three steps, where each step runs a wait step for 5s - // The second and third wait steps wait for the first to succeed after which they both run in parallel - // The total execution time for this test function should be greater than 5s but lesser than 15s - // as the first step runs for 5s and other two steps run in parallel after the first succeeds - // The test double deployer will be used for this test, as we - // need a deployer to test the plugin step provider. - startTime := time.Now() - logConfig := log.Config{ - Level: log.LevelInfo, - Destination: log.DestinationStdout, - } - logger := log.New( - logConfig, +func TestMissingInputsWrongOutput(t *testing.T) { + // For this test, the workflow should fail, not deadlock, due to no inputs possible. + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getTestImplPreparedWorkflow(t, missingInputsWrongOutputWorkflowDefinition), ) - cfg := &config.Config{ - Log: logConfig, - } - stepRegistry := NewTestImplStepRegistry(logger, t) - - executor := lang.Must2(workflow.NewExecutor( - logger, - cfg, - stepRegistry, - )) - wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(waitForParallelWorkflowDefinition))) - preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{})) - outputID, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) - assert.NoError(t, err) - assert.Equals(t, outputID, "success") - stepResult2 := outputData.(map[interface{}]interface{})["second_step_output"] - assert.NotNil(t, stepResult2) - stepResult3 := outputData.(map[interface{}]interface{})["third_step_output"] - assert.NotNil(t, stepResult3) - t.Log(stepResult3) - - duration := time.Since(startTime) - t.Logf("Test execution time: %s", duration) - var waitSuccess bool - if duration > 10*time.Second && duration < 20*time.Second { - waitSuccess = true - t.Logf("Steps second_wait and third_wait are running in parallel after waiting for the first_wait step.") - } else { - waitSuccess = false - t.Logf("Steps second_wait and third_wait are not running in parallel.") - } - assert.Equals(t, waitSuccess, true) + outputID, _, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) + assert.Error(t, err) + assert.Equals(t, outputID, "") }