diff --git a/README.md b/README.md index a5fd2b23..5615a976 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ This binary can then be used to run Arcaflow workflows. The simplest workflow is the example plugin workflow using the workflow schema version `v0.1.0`: (save it to workflow.yaml) ```yaml -version: v0.1.0 +version: v0.2.0 input: root: RootObject objects: @@ -72,7 +72,7 @@ You can load this config by passing the `-config` flag to Arcaflow. ### Supported Workflow Schema Versions -- v0.1.0 +- v0.2.0 ## Deployer options diff --git a/config/config.go b/config/config.go index 55e45cbc..3468e719 100644 --- a/config/config.go +++ b/config/config.go @@ -15,9 +15,9 @@ type Config struct { // TypeHintPlugins holds a list of plugins that will be used when building a type hint (e.g. JSONSchema) file for // workflows. TypeHintPlugins []string `json:"plugins" yaml:"plugins"` - // LocalDeployer holds the configuration for executing plugins locally. This deployer is used to obtain the schema + // LocalDeployers holds the configuration for executing plugins locally. This deployer is used to obtain the schema // from the plugins before executing them in a remote environment. - LocalDeployer any `json:"deployer" yaml:"deployer"` + LocalDeployers map[string]any `json:"deployers" yaml:"deployers"` // Log configures logging for workflow runs. Log log.Config `json:"log" yaml:"log"` // StepOutputLogging allows logging of step output diff --git a/config/load_test.go b/config/load_test.go index 674cd1fb..daabe002 100644 --- a/config/load_test.go +++ b/config/load_test.go @@ -18,8 +18,8 @@ var configLoadData = map[string]struct { input: "", expectedOutput: &config.Config{ TypeHintPlugins: nil, - LocalDeployer: map[string]any{ - "type": "docker", + LocalDeployers: map[string]any{ + "image": map[string]string{"deployer_name": "docker"}, }, Log: log.Config{ Level: log.LevelInfo, @@ -34,8 +34,8 @@ log: `, expectedOutput: &config.Config{ TypeHintPlugins: nil, - LocalDeployer: map[string]any{ - "type": "docker", + LocalDeployers: map[string]any{ + "image": map[string]string{"deployer_name": "docker"}, }, Log: log.Config{ Level: log.LevelDebug, @@ -45,13 +45,14 @@ log: }, "type-kubernetes": { input: ` -deployer: - type: kubernetes +deployers: + image: + deployer_name: kubernetes `, expectedOutput: &config.Config{ TypeHintPlugins: nil, - LocalDeployer: map[string]any{ - "type": "kubernetes", + LocalDeployers: map[string]any{ + "image": map[string]string{"deployer_name": "kubernetes"}, }, Log: log.Config{ Level: log.LevelInfo, @@ -68,8 +69,8 @@ plugins: TypeHintPlugins: []string{ "quay.io/arcalot/example-plugin:latest", }, - LocalDeployer: map[string]any{ - "type": "docker", + LocalDeployers: map[string]any{ + "image": map[string]string{"deployer_name": "docker"}, }, Log: log.Config{ Level: log.LevelInfo, diff --git a/config/schema.go b/config/schema.go index ae940ea1..9bd216e5 100644 --- a/config/schema.go +++ b/config/schema.go @@ -45,20 +45,25 @@ func getConfigSchema() *schema.TypedScopeSchema[*Config] { nil, nil, ), - "deployer": schema.NewPropertySchema( - schema.NewAnySchema(), + "deployers": schema.NewPropertySchema( + schema.NewMapSchema( + schema.NewStringSchema(nil, nil, nil), + schema.NewAnySchema(), + nil, + nil, + ), schema.NewDisplayValue( - schema.PointerTo("Local deployer"), + schema.PointerTo("Local deployers"), schema.PointerTo( - "Local container environment configuration the workflow engine can use to test-deploy plugins before the workflow execution.", + "Default deployers for each plugin type.", ), nil, ), - true, + false, nil, nil, nil, - schema.PointerTo("{\"type\":\"docker\"}"), + schema.PointerTo(`{"image": {"deployer_name": "docker"}}`), nil, ), "logged_outputs": schema.NewPropertySchema( diff --git a/engine.go b/engine.go index df3dd536..d82687a3 100644 --- a/engine.go +++ b/engine.go @@ -5,7 +5,6 @@ import ( "context" "fmt" log "go.arcalot.io/log/v2" - "go.flow.arcalot.io/deployer/registry" "go.flow.arcalot.io/engine/config" "go.flow.arcalot.io/engine/internal/step" "go.flow.arcalot.io/engine/internal/yaml" @@ -14,7 +13,7 @@ import ( ) var supportedVersions = map[string]struct{}{ - "v0.1.0": {}, + "v0.2.0": {}, } // WorkflowEngine is responsible for executing workflows and returning their result. @@ -58,10 +57,9 @@ type Workflow interface { } type workflowEngine struct { - logger log.Logger - deployerRegistry registry.Registry - stepRegistry step.Registry - config *config.Config + logger log.Logger + stepRegistry step.Registry + config *config.Config } func (w workflowEngine) RunWorkflow(ctx context.Context, input []byte, workflowContext map[string][]byte, workflowFileName string) (outputID string, outputData any, outputError bool, err error) { diff --git a/engine_test.go b/engine_test.go index 3ff7cc5b..be8dd1f0 100644 --- a/engine_test.go +++ b/engine_test.go @@ -14,9 +14,11 @@ import ( ) func TestEngineWorkflow_ParseVersion(t *testing.T) { - _, err := engine.SupportedVersion("v0.1.0") + _, err := engine.SupportedVersion("v0.2.0") assert.NoError(t, err) - _, err = engine.SupportedVersion("v0.11.0") + + // test unsupported version + _, err = engine.SupportedVersion("v0.1000.0") assert.Error(t, err) } @@ -102,7 +104,7 @@ func TestEmptySteps(t *testing.T) { context.Background(), nil, map[string][]byte{ - "workflow.yaml": []byte(`version: v0.1.0 + "workflow.yaml": []byte(`version: v0.2.0 output: [] steps: []`), }, @@ -117,7 +119,7 @@ func TestNoSteps(t *testing.T) { context.Background(), nil, map[string][]byte{ - "workflow.yaml": []byte(`version: v0.1.0 + "workflow.yaml": []byte(`version: v0.2.0 output: []`), }, "", @@ -131,7 +133,7 @@ func TestE2E(t *testing.T) { context.Background(), []byte(`name: Arca Lot`), map[string][]byte{ - "workflow.yaml": []byte(`version: v0.1.0 + "workflow.yaml": []byte(`version: v0.2.0 input: root: RootObject objects: @@ -143,7 +145,9 @@ input: type_id: string steps: example: - plugin: quay.io/arcalot/arcaflow-plugin-template-python:0.2.1 + plugin: + src: quay.io/arcalot/arcaflow-plugin-template-python:0.2.1 + deployment_type: image input: name: !expr $.input.name output: @@ -162,7 +166,7 @@ func TestE2EMultipleOutputs(t *testing.T) { context.Background(), []byte(`name: Arca Lot`), map[string][]byte{ - "workflow.yaml": []byte(`version: v0.1.0 + "workflow.yaml": []byte(`version: v0.2.0 input: root: RootObject objects: @@ -174,7 +178,9 @@ input: type_id: string steps: example: - plugin: quay.io/arcalot/arcaflow-plugin-template-python:0.2.1 + plugin: + src: quay.io/arcalot/arcaflow-plugin-template-python:0.2.1 + deployment_type: image input: name: !expr $.input.name outputs: diff --git a/go.mod b/go.mod index de8d6e7d..032fcef2 100644 --- a/go.mod +++ b/go.mod @@ -7,14 +7,14 @@ require ( go.arcalot.io/dgraph v1.1.0 go.arcalot.io/lang v1.0.0 go.arcalot.io/log/v2 v2.0.0 - go.flow.arcalot.io/deployer v0.3.0 - go.flow.arcalot.io/dockerdeployer v0.4.0 + go.flow.arcalot.io/deployer v0.4.0 + go.flow.arcalot.io/dockerdeployer v0.5.0 go.flow.arcalot.io/expressions v0.2.1 - go.flow.arcalot.io/kubernetesdeployer v0.7.0 + go.flow.arcalot.io/kubernetesdeployer v0.8.0 go.flow.arcalot.io/pluginsdk v0.5.0 - go.flow.arcalot.io/podmandeployer v0.5.0 - go.flow.arcalot.io/pythondeployer v0.3.0 - go.flow.arcalot.io/testdeployer v0.3.0 + go.flow.arcalot.io/podmandeployer v0.6.0 + go.flow.arcalot.io/pythondeployer v0.4.0 + go.flow.arcalot.io/testdeployer v0.4.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -24,7 +24,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/distribution/reference v0.5.0 // indirect github.com/docker/distribution v2.8.3+incompatible // indirect - github.com/docker/docker v24.0.6+incompatible // indirect + github.com/docker/docker v24.0.7+incompatible // indirect github.com/docker/go-connections v0.4.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/emicklei/go-restful/v3 v3.10.0 // indirect @@ -50,7 +50,7 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/rogpeppe/go-internal v1.11.0 // indirect github.com/x448/float16 v0.8.4 // indirect - go.flow.arcalot.io/testplugin v0.2.1 // indirect + go.flow.arcalot.io/testplugin v0.3.0 // indirect golang.org/x/mod v0.13.0 // indirect golang.org/x/net v0.15.0 // indirect golang.org/x/oauth2 v0.2.0 // indirect diff --git a/go.sum b/go.sum index fc01050e..1144696a 100644 --- a/go.sum +++ b/go.sum @@ -23,8 +23,8 @@ github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= github.com/docker/distribution v2.8.3+incompatible h1:AtKxIZ36LoNK51+Z6RpzLpddBirtxJnzDrHLEKxTAYk= github.com/docker/distribution v2.8.3+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= -github.com/docker/docker v24.0.6+incompatible h1:hceabKCtUgDqPu+qm0NgsaXf28Ljf4/pWFL7xjWWDgE= -github.com/docker/docker v24.0.6+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/docker v24.0.7+incompatible h1:Wo6l37AuwP3JaMnZa226lzVXGA3F9Ig1seQen0cKYlM= +github.com/docker/docker v24.0.7+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ= github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= @@ -160,24 +160,24 @@ go.arcalot.io/lang v1.0.0 h1:mgDaieT4wWdZTnR4V7+/pgYRmzfU7VZZgIzHccuxAbY= go.arcalot.io/lang v1.0.0/go.mod h1:ALqfYEhAzC2WoGLaycmJoNJd5NmkR7V1PSKp/c5D278= go.arcalot.io/log/v2 v2.0.0 h1:mbmsWDVBXZNWrDzUh5JLzeGCQ59kTuMFs+pyfJGc1hk= go.arcalot.io/log/v2 v2.0.0/go.mod h1:1V8jnFIIGwh2CtcGkHNOmy1nCo7LbazQNkUcnKYNMn4= -go.flow.arcalot.io/deployer v0.3.0 h1:LPikgRG5jGA76W8JthycvzfREL5Y0+++KAiQxSnKhdU= -go.flow.arcalot.io/deployer v0.3.0/go.mod h1:x6gsz/hANR8qN1nerpyY3vXpdaqofDH5Wlg+Nsqg/x0= -go.flow.arcalot.io/dockerdeployer v0.4.0 h1:t5b8o3xfKKb/WIX558486csjo4uMQmAXsikBLsKFEIg= -go.flow.arcalot.io/dockerdeployer v0.4.0/go.mod h1:UZSM6buJBRlgCURUE/BVkak8tfAXzj3oeQBSRZECbSc= +go.flow.arcalot.io/deployer v0.4.0 h1:5YveLCX+zc8Ra/aukHOwD5OrJD2W8WRzoruf3bpJfqY= +go.flow.arcalot.io/deployer v0.4.0/go.mod h1:x6gsz/hANR8qN1nerpyY3vXpdaqofDH5Wlg+Nsqg/x0= +go.flow.arcalot.io/dockerdeployer v0.5.0 h1:CKsy5K38/0hF6dEry7f0YIfI5uNKGImyCwi8miwPEhE= +go.flow.arcalot.io/dockerdeployer v0.5.0/go.mod h1:zE/okmoc47PTdJxiag+Gk9LMzKR1GemeP1Q3woOY3/A= go.flow.arcalot.io/expressions v0.2.1 h1:TAAbDrgJJLpmgA5ASyP/KzrXWtpEaQ8JsCPHgpe5kLw= go.flow.arcalot.io/expressions v0.2.1/go.mod h1:Vw1ScNu4Uyw1/l87LAH8jxe0DyRWwMh+rlfB/BPYDOU= -go.flow.arcalot.io/kubernetesdeployer v0.7.0 h1:r41qWc/XiPy9l3cfMXZG8F2kGenRh1xsx2auim/Ydyw= -go.flow.arcalot.io/kubernetesdeployer v0.7.0/go.mod h1:VvU6duoo5NR2ITUhx/UCGrkdJnXIeYm+/yHmGKtkXsk= +go.flow.arcalot.io/kubernetesdeployer v0.8.0 h1:UjH/aspPif/k+X65sLWlNDZAW5JlzUfgOnLHOrhxEQk= +go.flow.arcalot.io/kubernetesdeployer v0.8.0/go.mod h1:BhERhKpvQMJkrcW9lbBF4kJEe+OGhz2NpSftZIgtVNQ= go.flow.arcalot.io/pluginsdk v0.5.0 h1:TRS/waCTcdoMZ9neDAcfy3zpzyDnPHRbhV+Y1kpcw3Y= go.flow.arcalot.io/pluginsdk v0.5.0/go.mod h1:2s2f//7uOkBjr1QaiWJD/bqDIeLlINJtD1BhiY4aGPM= -go.flow.arcalot.io/podmandeployer v0.5.0 h1:h7hEhWUgxJzNKlEohZ+meKhl3FWjaXQahQ8vN3YVRNs= -go.flow.arcalot.io/podmandeployer v0.5.0/go.mod h1:36JCcTB6nauahcXUPfIpdEw7Zfp0ufM07o3VNTvrCc0= -go.flow.arcalot.io/pythondeployer v0.3.0 h1:ercLuDwFoDSL0f6YvZEqFW0/nO7Yv7DkbROl3rKxYDk= -go.flow.arcalot.io/pythondeployer v0.3.0/go.mod h1:ND1x/Vhu/6q50zQeisCcD6oQ6lKVJFflOrfDccnIjSY= -go.flow.arcalot.io/testdeployer v0.3.0 h1:Soyz+rDa3Y3VjWBGuL3zNlX3LM4uKp9Ex7///fCgrZA= -go.flow.arcalot.io/testdeployer v0.3.0/go.mod h1:Eel0ORhtKdYYDsd+e+btBBygIn+9Sz/b+JFDwH39VWI= -go.flow.arcalot.io/testplugin v0.2.1 h1:9kQ2MKvcXtEcwk5c4qSWN+FovpER2C9vn730laAm9iE= -go.flow.arcalot.io/testplugin v0.2.1/go.mod h1:ZoVF8tIKppQmj5nvoZPA48GQ7BuoWXQcuCw2x2sJxjE= +go.flow.arcalot.io/podmandeployer v0.6.0 h1:SlcQUU6xt24Oa0OFNnwYE+d+XQbDXErMLvpCi2gMHoA= +go.flow.arcalot.io/podmandeployer v0.6.0/go.mod h1:4wfcl0qjV02y64We3ZSDz+3lwdOfbe+gpFjm7SQKTRA= +go.flow.arcalot.io/pythondeployer v0.4.0 h1:l8nw6awYMVzgND+ZXdbnNJPYu3V0sgSUFsIzn+SRgh0= +go.flow.arcalot.io/pythondeployer v0.4.0/go.mod h1:me9SIMVTCBzCmceILdBMxXYrZGakiPOMasHgujmgJlE= +go.flow.arcalot.io/testdeployer v0.4.0 h1:helexgZOnYlbzU+egkxsLs95iMwE3lc+vp+Qbv2xwFI= +go.flow.arcalot.io/testdeployer v0.4.0/go.mod h1:Luw76oKeri40a7k8buk7Q7J86Tpt8lf9LTVZr6lqgTk= +go.flow.arcalot.io/testplugin v0.3.0 h1:LY0VWL1M0X+mSE0nVCwoD0PjQHqKhbka2Gc70uuP04k= +go.flow.arcalot.io/testplugin v0.3.0/go.mod h1:TW6h/kZyM+4gBBKqvoDI0XWXgNcUMKbDHaQOaKIRZdk= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= diff --git a/internal/step/foreach/provider_example_test.go b/internal/step/foreach/provider_example_test.go index 952cf7ae..6bc248a8 100644 --- a/internal/step/foreach/provider_example_test.go +++ b/internal/step/foreach/provider_example_test.go @@ -16,7 +16,7 @@ import ( // mainWorkflow is the workflow calling the foreach step. var mainWorkflow = ` -version: v0.1.0 +version: v0.2.0 input: root: names objects: @@ -44,7 +44,7 @@ output: ` var subworkflow = ` -version: v0.1.0 +version: v0.2.0 input: root: name objects: diff --git a/internal/step/plugin/provider.go b/internal/step/plugin/provider.go index 71fc0354..8912e95b 100644 --- a/internal/step/plugin/provider.go +++ b/internal/step/plugin/provider.go @@ -26,19 +26,34 @@ const errorStr = "error" // deployers. Most importantly it specifies which deployer is used for this // deployment with the 'type' key. // For more info, see `config/schema.go` -func New(logger log.Logger, deployerRegistry registry.Registry, localDeployerConfig any) (step.Provider, error) { - unserializedLocalDeployerConfig, err := deployerRegistry.Schema().Unserialize(localDeployerConfig) - if err != nil { - return nil, fmt.Errorf("failed to load local deployer configuration, please check your Arcaflow configuration file (%w)", err) - } - localDeployer, err := deployerRegistry.Create(unserializedLocalDeployerConfig, logger.WithLabel("source", "deployer")) - if err != nil { - return nil, fmt.Errorf("invalid local deployer configuration, please check your Arcaflow configuration file (%w)", err) +func New(logger log.Logger, deployerRegistry registry.Registry, localDeployerConfigs map[string]any) (step.Provider, error) { + localDeployers := make(map[deployer.DeploymentType]deployer.Connector) + + // Build local deployers from requested deployers in engine workflow config. + for reqDeploymentType, deployerConfig := range localDeployerConfigs { + reqDeploymentTypeType := deployer.DeploymentType(reqDeploymentType) + // Unserialize config using deployer's schema in registry. + // This will return an error if the requested deployment type + // is not in the registry. + unserializedLocalDeployerConfig, err := deployerRegistry.DeployConfigSchema( + reqDeploymentTypeType).Unserialize(deployerConfig) + if err != nil { + return nil, fmt.Errorf("failed to load requested deployer type %s from workflow config (%w)", + reqDeploymentType, err) + } + + localDeployer, err := deployerRegistry.Create(reqDeploymentTypeType, + unserializedLocalDeployerConfig, logger.WithLabel("source", "deployer")) + if err != nil { + return nil, fmt.Errorf("invalid local deployer configuration, please check your Arcaflow configuration file (%w)", err) + } + localDeployers[reqDeploymentTypeType] = localDeployer } + return &pluginProvider{ logger: logger.WithLabel("source", "plugin-provider"), deployerRegistry: deployerRegistry, - localDeployer: localDeployer, + localDeployers: localDeployers, }, nil } @@ -48,20 +63,62 @@ func (p *pluginProvider) Kind() string { type pluginProvider struct { deployerRegistry registry.Registry - localDeployer deployer.Connector + localDeployers map[deployer.DeploymentType]deployer.Connector logger log.Logger } func (p *pluginProvider) Register(_ step.Registry) { } +func keysString(m []deployer.DeploymentType) string { + keys := make([]string, 0, len(m)) + for _, k := range m { + keys = append(keys, string(k)) + } + return "[" + strings.Join(keys, ", ") + "]" +} + func (p *pluginProvider) ProviderSchema() map[string]*schema.PropertySchema { return map[string]*schema.PropertySchema{ "plugin": schema.NewPropertySchema( - schema.NewStringSchema(schema.PointerTo[int64](1), nil, nil), + schema.NewObjectSchema( + "plugin_fields", + map[string]*schema.PropertySchema{ + "src": schema.NewPropertySchema( + schema.NewStringSchema(schema.PointerTo[int64](1), nil, nil), + schema.NewDisplayValue( + schema.PointerTo("Source"), + schema.PointerTo("Source file to be executed."), nil), + true, + nil, + nil, + nil, + nil, + []string{"\"quay.io/arcaflow/example-plugin:latest\""}, + ), + "deployment_type": schema.NewPropertySchema( + schema.NewStringSchema(schema.PointerTo[int64](1), nil, nil), + schema.NewDisplayValue( + schema.PointerTo("Type"), + schema.PointerTo( + fmt.Sprintf("Deployment type [%s]", + keysString(p.deployerRegistry.DeploymentTypes()))), + nil, + ), + true, + nil, + nil, + nil, + nil, + []string{"image"}, + ), + }, + ), schema.NewDisplayValue( - schema.PointerTo("Plugin"), - schema.PointerTo("Plugin container image to run. This image must be an Arcaflow-compatible container."), + schema.PointerTo("Plugin Info"), + schema.PointerTo( + fmt.Sprintf("Deployment type %s", + keysString(p.deployerRegistry.DeploymentTypes()))), nil, ), true, @@ -69,7 +126,7 @@ func (p *pluginProvider) ProviderSchema() map[string]*schema.PropertySchema { nil, nil, nil, - []string{"\"quay.io/arcaflow/example-plugin:latest\""}, + nil, ), } } @@ -188,44 +245,56 @@ func (p *pluginProvider) Lifecycle() step.Lifecycle[step.LifecycleStage] { // LoadSchema deploys the plugin, connects to the plugin's ATP server, loads its schema, then // returns a runnableStep struct. Not to be confused with the runningStep struct. func (p *pluginProvider) LoadSchema(inputs map[string]any, _ map[string][]byte) (step.RunnableStep, error) { - image := inputs["plugin"].(string) + pluginSrcInput := inputs["plugin"].(map[string]any) + requestedDeploymentType := deployer.DeploymentType(pluginSrcInput["deployment_type"].(string)) + pluginSource := pluginSrcInput["src"].(string) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - plugin, err := p.localDeployer.Deploy(ctx, image) + applicableLocalDeployer, ok := p.localDeployers[requestedDeploymentType] + if !ok { + return nil, fmt.Errorf("missing local deployer for requested type %s", requestedDeploymentType) + } + pluginConnector, err := applicableLocalDeployer.Deploy(ctx, pluginSource) if err != nil { cancel() - return nil, fmt.Errorf("failed to deploy plugin from image %s (%w)", image, err) + return nil, fmt.Errorf("failed to deploy plugin of deployment type '%s' with source '%s' (%w)", + requestedDeploymentType, pluginSource, err) } // Set up the ATP connection - transport := atp.NewClientWithLogger(plugin, p.logger) + transport := atp.NewClientWithLogger(pluginConnector, p.logger) // Read the schema information s, err := transport.ReadSchema() if err != nil { cancel() - return nil, fmt.Errorf("failed to read plugin schema from %s (%w)", image, err) + // Close it. This allows it go get the error messages. + deployerErr := pluginConnector.Close() + return nil, fmt.Errorf("failed to read plugin schema from '%s' (%w). Deployer close error: (%s)", + pluginSource, err, deployerErr.Error()) } // Tell the server that the client is done if err := transport.Close(); err != nil { - return nil, fmt.Errorf("failed to instruct client to shut down image %s (%w)", image, err) + return nil, fmt.Errorf("failed to instruct client to shut down plugin from source '%s' (%w)", pluginSource, err) } // Shut down the plugin. - if err := plugin.Close(); err != nil { - return nil, fmt.Errorf("failed to shut down local plugin from %s (%w)", image, err) + if err := pluginConnector.Close(); err != nil { + return nil, fmt.Errorf("failed to shut down local plugin from '%s' (%w)", pluginSource, err) } return &runnableStep{ schemas: *s, logger: p.logger, - image: image, + deploymentType: requestedDeploymentType, + source: pluginSource, deployerRegistry: p.deployerRegistry, - localDeployer: p.localDeployer, + localDeployer: applicableLocalDeployer, }, nil } type runnableStep struct { - image string + deploymentType deployer.DeploymentType + source string deployerRegistry registry.Registry logger log.Logger schemas schema.SchemaSchema @@ -262,7 +331,7 @@ func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[st steps := r.schemas.Steps() if stepID == "" { if len(steps) != 1 { - return result, fmt.Errorf("the 'step' parameter is required for the '%s' plugin", r.image) + return result, fmt.Errorf("the 'step' parameter is required for the '%s' plugin", r.source) } for possibleStepID := range steps { stepID = possibleStepID @@ -270,7 +339,7 @@ func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[st } stepSchema, ok := r.schemas.Steps()[stepID] if !ok { - return result, fmt.Errorf("the step '%s' does not exist in the '%s' plugin", stepID, r.image) + return result, fmt.Errorf("the step '%s' does not exist in the '%s' plugin", stepID, r.source) } stopIfProperty := schema.NewPropertySchema( @@ -291,11 +360,11 @@ func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[st cancelSignal := stepSchema.SignalHandlers()[plugin.CancellationSignalSchema.ID()] if cancelSignal == nil { // Not present - stopIfProperty.Disable(fmt.Sprintf("Cancel signal with ID '%s' is not present in plugin image '%s', step '%s'. Signal handler IDs present: %v", - plugin.CancellationSignalSchema.ID(), r.image, stepID, reflect.ValueOf(stepSchema.SignalHandlers()).MapKeys())) + stopIfProperty.Disable(fmt.Sprintf("Cancel signal with ID '%s' is not present in plugin '%s', step '%s'. Signal handler IDs present: %v", + plugin.CancellationSignalSchema.ID(), r.source, stepID, reflect.ValueOf(stepSchema.SignalHandlers()).MapKeys())) } else if err := plugin.CancellationSignalSchema.DataSchemaValue.ValidateCompatibility(cancelSignal.DataSchemaValue); err != nil { // Present but incompatible - stopIfProperty.Disable(fmt.Sprintf("Cancel signal invalid schema in plugin image '%s', step '%s' (%s)", r.image, stepID, err)) + stopIfProperty.Disable(fmt.Sprintf("Cancel signal invalid schema in plugin '%s', step '%s' (%s)", r.source, stepID, err)) } return step.Lifecycle[step.LifecycleStageWithSchema]{ @@ -305,7 +374,7 @@ func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[st LifecycleStage: deployingLifecycleStage, InputSchema: map[string]*schema.PropertySchema{ "deploy": schema.NewPropertySchema( - r.deployerRegistry.Schema(), + r.deployerRegistry.DeployConfigSchema(r.deploymentType), schema.NewDisplayValue( schema.PointerTo("Deployment configuration"), schema.PointerTo( @@ -444,8 +513,8 @@ func (r *runnableStep) Start(input map[string]any, runID string, stageChangeHand i++ } return nil, fmt.Errorf( - "the %s plugin declares more than one possible step, please provide the step name (one of: %s)", - r.image, + "the '%s' plugin declares more than one possible step, please provide the step name (one of: %s)", + r.source, strings.Join(stepNames, ", "), ) } @@ -456,8 +525,8 @@ func (r *runnableStep) Start(input map[string]any, runID string, stageChangeHand stepSchema, ok := steps[stepID] if !ok { return nil, fmt.Errorf( - "plugin %s does not have a step named %s", - r.image, + "plugin '%s' does not have a step named %s", + r.source, stepID, ) } @@ -475,7 +544,8 @@ func (r *runnableStep) Start(input map[string]any, runID string, stageChangeHand deployInput: make(chan any, 1), runInput: make(chan any, 1), logger: r.logger, - image: r.image, + deploymentType: r.deploymentType, + source: r.source, pluginStepID: stepID, state: step.RunningStepStateStarting, localDeployer: r.localDeployer, @@ -506,7 +576,8 @@ type runningStep struct { logger log.Logger currentStage StageID runID string // The ID associated with this execution (the workflow step ID) - image string + deploymentType deployer.DeploymentType + source string pluginStepID string // The ID of the step in the plugin state step.RunningStepState useLocalDeployer bool @@ -574,7 +645,7 @@ func (r *runningStep) provideDeployInput(input map[string]any) error { var unserializedDeployerConfig any var err error if input["deploy"] != nil { - unserializedDeployerConfig, err = r.deployerRegistry.Schema().Unserialize(input["deploy"]) + unserializedDeployerConfig, err = r.deployerRegistry.DeployConfigSchema(r.deploymentType).Unserialize(input["deploy"]) if err != nil { return fmt.Errorf("invalid deployment information (%w)", err) } @@ -780,12 +851,13 @@ func (r *runningStep) deployStage() (deployer.Plugin, error) { var stepDeployer = r.localDeployer if !useLocalDeployer { var err error - stepDeployer, err = r.deployerRegistry.Create(deployerConfig, r.logger.WithLabel("source", "deployer")) + stepDeployer, err = r.deployerRegistry.Create(r.deploymentType, deployerConfig, + r.logger.WithLabel("source", "deployer")) if err != nil { return nil, err } } - container, err := stepDeployer.Deploy(r.ctx, r.image) + container, err := stepDeployer.Deploy(r.ctx, r.source) if err != nil { return nil, err } diff --git a/internal/step/plugin/provider_test.go b/internal/step/plugin/provider_test.go index 2db5f39b..2f96a2e4 100644 --- a/internal/step/plugin/provider_test.go +++ b/internal/step/plugin/provider_test.go @@ -6,6 +6,7 @@ import ( "go.arcalot.io/log/v2" "go.flow.arcalot.io/deployer" deployer_registry "go.flow.arcalot.io/deployer/registry" + docker "go.flow.arcalot.io/dockerdeployer" "go.flow.arcalot.io/engine/internal/step" "go.flow.arcalot.io/engine/internal/step/plugin" testdeployer "go.flow.arcalot.io/testdeployer" @@ -110,22 +111,107 @@ func (s *stageChangeHandler) OnStepComplete( s.message <- message } -func TestProvider_Utility(t *testing.T) { +func TestProvider_MultipleDeployers(t *testing.T) { + logger := log.New( + log.Config{ + Level: log.LevelError, + Destination: log.DestinationStdout, + }, + ) + deployerRegistry := deployer_registry.New( + deployer.Any(testdeployer.NewFactory()), + deployer.Any(docker.NewFactory())) + deployTimeMs := 20 workflowDeployerCfg := map[string]any{ - "type": "test-impl", + "builtin": map[string]any{ + "deployer_name": "test-impl", + "deploy_time": deployTimeMs, + "deploy_succeed": true, + }, + "image": map[string]any{ + "deployer_name": "docker", + }, } - plp, err := plugin.New( - log.New( - log.Config{ - Level: log.LevelError, - Destination: log.DestinationStdout, - }, - ), - deployer_registry.New( - deployer.Any(testdeployer.NewFactory())), - workflowDeployerCfg, + plp, err := plugin.New(logger, deployerRegistry, workflowDeployerCfg) + assert.NoError(t, err) + assert.Equals(t, plp.Kind(), "plugin") + assert.NotNil(t, plp.ProviderSchema()) + assert.NotNil(t, plp.RunProperties()) + assert.NotNil(t, plp.Lifecycle()) + + stepSchema := map[string]any{ + "plugin": map[string]any{ + "src": "simulation", + "deployment_type": "builtin", + }, + } + byteSchema := map[string][]byte{} + + runnable, err := plp.LoadSchema(stepSchema, byteSchema) + assert.NoError(t, err) + + assert.NotNil(t, runnable.RunSchema()) + + _, err = runnable.Lifecycle(map[string]any{"step": "wait"}) + assert.NoError(t, err) + + _, err = runnable.Lifecycle(map[string]any{"step": "hello"}) + assert.NoError(t, err) + + // There is more than one step, so no specified one will cause an error. + _, err = runnable.Lifecycle(map[string]any{"step": nil}) + assert.Error(t, err) +} + +func TestProvider_MissingDeployer(t *testing.T) { + logger := log.New( + log.Config{ + Level: log.LevelError, + Destination: log.DestinationStdout, + }, ) + deployerRegistry := deployer_registry.New() // Empty. So it will error out. + workflowDeployerCfg := map[string]any{ + "builtin": map[string]any{ + "deployer_name": "test-impl", + }, + } + + _, err := plugin.New(logger, deployerRegistry, workflowDeployerCfg) + assert.Error(t, err) +} +func TestProvider_MismatchedDeploymentTypes(t *testing.T) { + logger := log.New( + log.Config{ + Level: log.LevelError, + Destination: log.DestinationStdout, + }, + ) + deployerRegistry := deployer_registry.New(deployer.Any(testdeployer.NewFactory())) + // Mismatched. test-impl is has the deployment type builtin, but we're trying to specify it for the image type. + workflowDeployerCfg := map[string]any{ + "image": map[string]any{ + "deployer_name": "test-impl", + }, + } + + _, err := plugin.New(logger, deployerRegistry, workflowDeployerCfg) + assert.Error(t, err) +} + +func TestProvider_Utility(t *testing.T) { + workflowDeployerCfg := map[string]any{ + "builtin": map[string]any{"deployer_name": "test-impl"}, + } + + plp, err := plugin.New(log.New( + log.Config{ + Level: log.LevelError, + Destination: log.DestinationStdout, + }, + ), deployer_registry.New( + deployer.Any(testdeployer.NewFactory())), workflowDeployerCfg) assert.NoError(t, err) assert.Equals(t, plp.Kind(), "plugin") assert.NotNil(t, plp.ProviderSchema()) @@ -133,7 +219,10 @@ func TestProvider_Utility(t *testing.T) { assert.NotNil(t, plp.Lifecycle()) stepSchema := map[string]any{ - "plugin": "simulation", + "plugin": map[string]any{ + "src": "simulation", + "deployment_type": "builtin", + }, } byteSchema := map[string][]byte{} @@ -161,28 +250,36 @@ func TestProvider_HappyError(t *testing.T) { }, ) workflowDeployerCfg := map[string]any{ - "type": "test-impl", + "builtin": map[string]any{ + "deployer_name": "test-impl"}, } deployerRegistry := deployer_registry.New( deployer.Any(testdeployer.NewFactory())) - _, err := plugin.New( - logger, - deployerRegistry, - map[string]any{"deployer_cfg": "bad"}, - ) + _, err := plugin.New(logger, deployerRegistry, map[string]any{ + "wrong": map[string]any{ + "deployer_name": "test-impl", + }}) assert.Error(t, err) - plp, err := plugin.New( - logger, - deployerRegistry, - workflowDeployerCfg, - ) + _, err = plugin.New(logger, deployerRegistry, map[string]any{ + "builtin": map[string]any{ + "deployer_name": "bad", + }}) + assert.Error(t, err) + + plp, err := plugin.New(logger, deployerRegistry, workflowDeployerCfg) assert.NoError(t, err) - runnable, err := plp.LoadSchema( - map[string]any{"plugin": "simulation"}, map[string][]byte{}) + stepSchema := map[string]any{ + "plugin": map[string]any{ + "src": "simulation", + "deployment_type": "builtin"}, + } + byteSchema := map[string][]byte{} + + runnable, err := plp.LoadSchema(stepSchema, byteSchema) assert.NoError(t, err) handler := &stageChangeHandler{ @@ -205,23 +302,23 @@ func TestProvider_HappyError(t *testing.T) { assert.Error(t, running.ProvideStageInput( string(plugin.StageIDDeploy), map[string]any{"deploy": map[string]any{ - "type": "test-impl", - "deploy_time": "abc"}}, + "deployer_name": "test-impl", + "deploy_time": "abc"}}, )) assert.NoError(t, running.ProvideStageInput( string(plugin.StageIDDeploy), map[string]any{"deploy": map[string]any{ - "type": "test-impl", - "deploy_time": 1}}, + "deployer_name": "test-impl", + "deploy_time": 1}}, )) // provide deploy input a 2nd time assert.Error(t, running.ProvideStageInput( string(plugin.StageIDDeploy), map[string]any{"deploy": map[string]any{ - "type": "test-impl", - "deploy_time": nil}}, + "deployer_name": "test-impl", + "deploy_time": nil}}, )) // unserialize nil input schema error @@ -271,21 +368,24 @@ func TestProvider_VerifyCancelSignal(t *testing.T) { }, ) workflowDeployerCfg := map[string]any{ - "type": "test-impl", + "builtin": map[string]any{ + "deployer_name": "test-impl", + }, } deployerRegistry := deployer_registry.New( deployer.Any(testdeployer.NewFactory())) - plp, err := plugin.New( - logger, - deployerRegistry, - workflowDeployerCfg, - ) + plp, err := plugin.New(logger, deployerRegistry, workflowDeployerCfg) assert.NoError(t, err) runnable, err := plp.LoadSchema( - map[string]any{"plugin": "simulation"}, map[string][]byte{}) + map[string]any{ + "plugin": map[string]any{ + "src": "simulation", + "deployment_type": "builtin", + }}, + map[string][]byte{}) assert.NoError(t, err) assert.NotNil(t, runnable) @@ -317,33 +417,35 @@ func TestProvider_VerifyCancelSignal(t *testing.T) { } func TestProvider_DeployFail(t *testing.T) { - logConfig := log.Config{ - Level: log.LevelError, - Destination: log.DestinationStdout, - } logger := log.New( - logConfig, + log.Config{ + Level: log.LevelError, + Destination: log.DestinationStdout, + }, ) - + deployerRegistry := deployer_registry.New( + deployer.Any(testdeployer.NewFactory())) deployTimeMs := 20 workflowDeployerCfg := map[string]any{ - "type": "test-impl", - "deploy_time": deployTimeMs, - "deploy_succeed": true, + "builtin": map[string]any{ + "deployer_name": "test-impl", + "deploy_time": deployTimeMs, + "deploy_succeed": true, + }, } - deployerRegistry := deployer_registry.New( - deployer.Any(testdeployer.NewFactory())) - - plp, err := plugin.New( - logger, - deployerRegistry, - workflowDeployerCfg, - ) + plp, err := plugin.New(logger, deployerRegistry, workflowDeployerCfg) assert.NoError(t, err) - runnable, err := plp.LoadSchema( - map[string]any{"plugin": "simulation"}, map[string][]byte{}) + stepSchema := map[string]any{ + "plugin": map[string]any{ + "src": "simulation", + "deployment_type": "builtin", + }, + } + byteSchema := map[string][]byte{} + + runnable, err := plp.LoadSchema(stepSchema, byteSchema) assert.NoError(t, err) handler := &deployFailStageChangeHandler{ @@ -356,10 +458,13 @@ func TestProvider_DeployFail(t *testing.T) { assert.NoError(t, running.ProvideStageInput( string(plugin.StageIDDeploy), - map[string]any{"deploy": map[string]any{ - "type": "test-impl", - "deploy_succeed": false, - "deploy_time": deployTimeMs}}, + map[string]any{ + "deploy": map[string]any{ + "deployer_name": "test-impl", + "deploy_succeed": false, + "deploy_time": deployTimeMs, + }, + }, )) waitTimeMs := 50 @@ -392,22 +497,25 @@ func TestProvider_StartFail(t *testing.T) { ) deployTimeMs := 20 workflowDeployerCfg := map[string]any{ - "type": "test-impl", - "deploy_time": deployTimeMs, - "deploy_succeed": true, + "builtin": map[string]any{ + "deployer_name": "test-impl", + "deploy_time": deployTimeMs, + "deploy_succeed": true, + }, } - plp, err := plugin.New( - logger, - deployer_registry.New( - deployer.Any(testdeployer.NewFactory())), - workflowDeployerCfg, - ) + plp, err := plugin.New(logger, deployer_registry.New( + deployer.Any(testdeployer.NewFactory())), workflowDeployerCfg) assert.NoError(t, err) - runnable, err := plp.LoadSchema( - map[string]any{"plugin": "simulation"}, - map[string][]byte{}) + stepSchema := map[string]any{ + "plugin": map[string]any{ + "src": "simulation", + "deployment_type": "builtin"}, + } + byteSchema := map[string][]byte{} + + runnable, err := plp.LoadSchema(stepSchema, byteSchema) assert.NoError(t, err) handler := &startFailStageChangeHandler{ @@ -421,7 +529,7 @@ func TestProvider_StartFail(t *testing.T) { assert.NoError(t, running.ProvideStageInput( string(plugin.StageIDDeploy), map[string]any{"deploy": map[string]any{ - "type": "test-impl", + "deployer_name": "test-impl", "deploy_succeed": true, "deploy_time": deployTimeMs, "disable_plugin_writes": true}}, diff --git a/new.go b/new.go index a4b0bebc..c923f7a3 100644 --- a/new.go +++ b/new.go @@ -11,18 +11,15 @@ func New( config *config.Config, ) (WorkflowEngine, error) { logger := log.New(config.Log) - stepRegistry, err := NewDefaultStepRegistry( - logger, - DefaultDeployerRegistry, - config, - ) + + stepRegistry, err := NewDefaultStepRegistry(logger, + DefaultDeployerRegistry, config) if err != nil { return nil, err } return &workflowEngine{ - logger: logger, - config: config, - stepRegistry: stepRegistry, - deployerRegistry: DefaultDeployerRegistry, + logger: logger, + config: config, + stepRegistry: stepRegistry, }, nil } diff --git a/steps.go b/steps.go index db1f382a..9840f752 100644 --- a/steps.go +++ b/steps.go @@ -2,7 +2,6 @@ package engine import ( "fmt" - "go.arcalot.io/log/v2" deployerRegistry "go.flow.arcalot.io/deployer/registry" "go.flow.arcalot.io/engine/config" @@ -14,12 +13,8 @@ import ( ) // NewDefaultStepRegistry creates a registry with the default step types applied. -func NewDefaultStepRegistry( - logger log.Logger, - deployerRegistry deployerRegistry.Registry, - config *config.Config, -) (step.Registry, error) { - pluginProvider, err := plugin.New(logger, deployerRegistry, config.LocalDeployer) +func NewDefaultStepRegistry(logger log.Logger, deployerRegistry deployerRegistry.Registry, config *config.Config) (step.Registry, error) { + pluginProvider, err := plugin.New(logger, deployerRegistry, config.LocalDeployers) if err != nil { return nil, fmt.Errorf("failed to create plugin step provider (%w)", err) } diff --git a/workflow/executor_example_test.go b/workflow/executor_example_test.go index 25a1011f..b68ab747 100644 --- a/workflow/executor_example_test.go +++ b/workflow/executor_example_test.go @@ -13,7 +13,7 @@ import ( ) var workflowYAML = `--- -version: v0.1.0 +version: v0.2.0 input: root: RootObject objects: diff --git a/workflow/executor_test.go b/workflow/executor_test.go index 80fae2c4..daecaf7c 100644 --- a/workflow/executor_test.go +++ b/workflow/executor_test.go @@ -64,8 +64,10 @@ func NewTestImplStepRegistry( pluginProvider := assert.NoErrorR[step.Provider](t)( plugin.New(logger, deployerRegistry, map[string]interface{}{ - "type": "test-impl", - "deploy_time": "0", + "builtin": map[string]any{ + "deployer_name": "test-impl", + "deploy_time": "0", + }, }), ) return assert.NoErrorR[step.Registry](t)(stepregistry.New( @@ -74,7 +76,7 @@ func NewTestImplStepRegistry( } var sharedInputWorkflowYAML = `--- -version: v0.1.0 +version: v0.2.0 input: root: RootObject objects: @@ -115,7 +117,7 @@ func TestSharedInput(t *testing.T) { } var missingInputWorkflowDefinition1 = ` -version: v0.1.0 +version: v0.2.0 input: root: RootObject objects: @@ -133,7 +135,7 @@ outputs: ` var missingInputWorkflowDefinition2 = ` -version: v0.1.0 +version: v0.2.0 input: root: RootObject objects: @@ -159,7 +161,7 @@ func TestMissingInput(t *testing.T) { } var mismatchedStepInputTypesWorkflowDefinition = ` -version: v0.1.0 +version: v0.2.0 input: root: RootObject objects: @@ -189,7 +191,7 @@ func TestMismatchedStepInputTypes(t *testing.T) { } var mismatchedInputTypesWorkflowDefinition = ` -version: v0.1.0 +version: v0.2.0 input: root: RootObject objects: diff --git a/workflow/workflow_test.go b/workflow/workflow_test.go index f7f56254..c2bd6bc4 100644 --- a/workflow/workflow_test.go +++ b/workflow/workflow_test.go @@ -14,7 +14,7 @@ import ( ) var badWorkflowDefinition = ` -version: v0.1.0 +version: v0.2.0 input: root: name objects: @@ -46,7 +46,7 @@ func TestOutputFailed(t *testing.T) { } var stepCancellationWorkflowDefinition = ` -version: v0.1.0 +version: v0.2.0 input: root: RootObject objects: @@ -55,13 +55,17 @@ input: properties: {} steps: long_wait: - plugin: "n/a" + plugin: + src: "n/a" + deployment_type: "builtin" step: wait input: wait_time_ms: 2000 stop_if: !expr $.steps.short_wait.outputs short_wait: - plugin: "n/a" + plugin: + src: "n/a" + deployment_type: "builtin" step: wait input: # It needs to be long enough for it to ensure that long_wait is in a running state. @@ -90,7 +94,7 @@ func TestStepCancellation(t *testing.T) { } var earlyStepCancellationWorkflowDefinition = ` -version: v0.1.0 +version: v0.2.0 input: root: RootObject objects: @@ -101,18 +105,24 @@ steps: # This one needs to run longer than the total time expected of all the other steps, with # a large enough difference to prevent timing errors breaking the test. end_wait: - plugin: "n/a" + plugin: + src: "n/a" + deployment_type: "builtin" step: wait input: wait_time_ms: 80 # Delay needs to be delayed long enough to ensure that last_step isn't running when it's cancelled by short_wait delay: - plugin: "n/a" + plugin: + src: "n/a" + deployment_type: "builtin" step: wait input: wait_time_ms: 50 last_step: - plugin: "n/a" + plugin: + src: "n/a" + deployment_type: "builtin" step: wait input: wait_time_ms: 0 @@ -121,7 +131,9 @@ steps: # You can verify that this test works by commenting out this line. It should fail. stop_if: !expr $.steps.short_wait.outputs short_wait: - plugin: "n/a" + plugin: + src: "n/a" + deployment_type: "builtin" step: wait input: # End the test quickly. @@ -156,7 +168,7 @@ func TestEarlyStepCancellation(t *testing.T) { } var deploymentStepCancellationWorkflowDefinition = ` -version: v0.1.0 +version: v0.2.0 input: root: RootObject objects: @@ -167,12 +179,16 @@ steps: # This one needs to run longer than the total time expected of all the other steps, with # a large enough difference to prevent timing errors breaking the test. end_wait: - plugin: "n/a" + plugin: + src: "n/a" + deployment_type: "builtin" step: wait input: wait_time_ms: 100 step_to_cancel: - plugin: "n/a" + plugin: + src: "n/a" + deployment_type: "builtin" step: wait input: wait_time_ms: 0 @@ -180,10 +196,12 @@ steps: stop_if: !expr $.steps.short_wait.outputs # Delay needs to be delayed long enough to ensure that it's in a deploy state when it's cancelled by short_wait deploy: - type: "test-impl" - deploy_time: 50 # 50 ms + deployer_name: "test-impl" + deploy_time: 50 # 50 ms short_wait: - plugin: "n/a" + plugin: + src: "n/a" + deployment_type: "builtin" step: wait input: # End the test quickly. @@ -218,7 +236,7 @@ func TestDeploymentStepCancellation(t *testing.T) { } var simpleValidLiteralInputWaitWorkflowDefinition = ` -version: v0.1.0 +version: v0.2.0 input: root: RootObject objects: @@ -227,7 +245,9 @@ input: properties: {} steps: wait_1: - plugin: "n/a" + plugin: + src: "n/a" + deployment_type: "builtin" step: wait input: wait_time_ms: 0 @@ -247,7 +267,7 @@ func TestSimpleValidWaitWorkflow(t *testing.T) { } var waitForSerialWorkflowDefinition = ` -version: v0.1.0 +version: v0.2.0 input: root: RootObject objects: @@ -256,13 +276,17 @@ input: properties: {} steps: first_wait: - plugin: "n/a" + plugin: + src: "n/a" + deployment_type: "builtin" step: wait input: # Note: 5ms left only a 2.5ms margin for error. 10ms left almost 6ms. So 10ms min is recommended. wait_time_ms: 10 second_wait: - plugin: "n/a" + plugin: + src: "n/a" + deployment_type: "builtin" step: wait input: wait_time_ms: 10 @@ -323,7 +347,7 @@ func TestWaitForSerial(t *testing.T) { } var missingInputsFailedDeploymentWorkflowDefinition = ` -version: v0.1.0 +version: v0.2.0 input: root: RootObject objects: @@ -332,16 +356,20 @@ input: properties: {} steps: wait_1: - plugin: "n/a" + plugin: + src: "n/a" + deployment_type: "builtin" step: wait input: wait_time_ms: 0 deploy: - type: "test-impl" + deployer_name: "test-impl" #deploy_time: 20000 # 10 ms deploy_succeed: false wait_2: - plugin: "n/a" + plugin: + src: "n/a" + deployment_type: "builtin" step: wait wait_for: !expr $.steps.wait_1.outputs.success input: @@ -362,7 +390,7 @@ func TestMissingInputsFailedDeployment(t *testing.T) { } var missingInputsWrongOutputWorkflowDefinition = ` -version: v0.1.0 +version: v0.2.0 input: root: RootObject objects: @@ -371,12 +399,17 @@ input: properties: {} steps: wait_1: - plugin: "n/a" + plugin: + src: "n/a" + deployment_type: "builtin" step: wait input: wait_time_ms: 0 wait_2: - plugin: "n/a" + + plugin: + src: "n/a" + deployment_type: "builtin" step: wait # No stop_if, so this shouldn't happen. wait_for: !expr $.steps.wait_1.outputs.cancelled_early