Skip to content

Commit

Permalink
ATP v3, Validate Compatibility, Improved stop_if, better logs, retry …
Browse files Browse the repository at this point in the history
…fix (#114)

* Added workflow pre-validation

Also improved debug messages

* Updated go mod/sum

* Progress towards debugging deadlock detection

Mostly this is test cases, and some mild refactoring that should have a similar result

* Use image that supports ARM64

* Ensure that run finishes before closing the step

This ensures that the goroutines are done when the engine exits. This fixes a test case, and will be important when retrying is added to the deadlock check

* Added deadlock check retries

This is a workaround for the situation where all of the steps are processing at the same time, all waiting for input

* Update test cases to take less time

Also improved error logic

* Update dependencies

Required for prior commits to work

* Update go SDK to dev branch

This adds the done message to the SDK

* Updated test deployer to dev branch

* Updated deployers, and added python deployer to run-plugin

* Label loggers

* Update to use ATP v3

This means new error handling, so multiple errors can be reported

* Update SDK in go mod, and add force stop to step interface

Force stop is good for when you do not need to wait for the ATP client, or something equivalant, to finish. Waiting for the ATP to finish means waiting for the steps to finish entirely.

* Cleanup

* Fix tests

* Update SDK and test deployer

* Reduce redundancy in Plugin Provider

* Refactoring to fix linting errors

I reduced the amount of code in one function, and reduced redundancy of unlocking

* Fix linting errors

These ones were only shown on CI, not locally

* bump internal dependencies

* update all internal dependencies

* Remove fragile test

* bump python deployer to 0.3.0

---------

Signed-off-by: Dustin Black <[email protected]>
Co-authored-by: Dustin Black <[email protected]>
  • Loading branch information
jaredoconnell and dustinblack committed Oct 26, 2023
1 parent 1431d1b commit b292124
Show file tree
Hide file tree
Showing 17 changed files with 1,330 additions and 567 deletions.
2 changes: 1 addition & 1 deletion cmd/arcaflow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ Options:
}
cfg.Log.Stdout = os.Stderr

logger := log.New(cfg.Log)
logger := log.New(cfg.Log).WithLabel("source", "main")

dirContext, err := loadContext(dir)
if err != nil {
Expand Down
64 changes: 50 additions & 14 deletions cmd/run-plugin/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import (
"flag"
"fmt"
"go.flow.arcalot.io/deployer"
"go.flow.arcalot.io/pluginsdk/plugin"
"go.flow.arcalot.io/pluginsdk/schema"
podman "go.flow.arcalot.io/podmandeployer"
pythondeployer "go.flow.arcalot.io/pythondeployer"
"os"
"os/signal"

Expand All @@ -21,14 +24,19 @@ func main() {
var image string
var file string
var stepID string
var pythonPath string
var d deployer.AnyConnectorFactory
var defaultConfig any
var deployerID = "docker"
var runID = "run"
var workingDir string

flag.StringVar(&image, "image", image, "Docker image to run")
flag.StringVar(&file, "file", file, "Input file")
flag.StringVar(&stepID, "step", stepID, "Step name")
flag.StringVar(&deployerID, "deployer", stepID, "The name of the deployer")
flag.StringVar(&pythonPath, "pythonpath", "", "Path to the Python environment")
flag.StringVar(&workingDir, "workingdir", "~/", "Path to store cloned repositories")
flag.Parse()

switch deployerID {
Expand Down Expand Up @@ -60,35 +68,56 @@ func main() {
if err != nil {
panic(err)
}
case "python":
pythonFactory := pythondeployer.NewFactory()
d = deployer.Any(pythonFactory)
configSchema := pythonFactory.ConfigurationSchema()
var err error
configInput := map[string]any{}
configInput["pythonPath"] = pythonPath
configInput["workdir"] = workingDir
defaultConfig, err = configSchema.UnserializeType(configInput)
if err != nil {
panic(err)
}
default:
panic("No deployer or invalid deployer selected. Options: docker, podman, testimpl. Select with -deployer")
panic("No deployer or invalid deployer selected. Options: docker, podman, testimpl, python. Select with -deployer")
}

connector, err := d.Create(defaultConfig, log.New(log.Config{
logger := log.New(log.Config{
Level: log.LevelDebug,
Destination: log.DestinationStdout,
}))
})
connector, err := d.Create(defaultConfig, logger)
if err != nil {
panic(err)
}
ctrlC := make(chan os.Signal, 1)
signal.Notify(ctrlC, os.Interrupt)

// Set up the signal channel to send cancel signal on ctrl-c
toStepSignals := make(chan schema.Input, 3)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
select {
case <-ctrlC:
fmt.Println("Received CTRL-C. Cancelling the context to cancel the step...")
logger.Infof("Received CTRL-C. Sending cancel signal...")
toStepSignals <- schema.Input{
RunID: runID,
ID: plugin.CancellationSignalSchema.ID(),
InputData: make(map[string]any),
}
logger.Infof("Signal request sent to ATP client.")
cancel()
case <-ctx.Done():
// Done here.
}
}()

fmt.Println("Deploying")
logger.Infof("Deploying")
plugin, err := connector.Deploy(ctx, image)
if err != nil {
logger.Errorf("Error while deploying: %s", err)
panic(err)
}
defer func() {
Expand All @@ -97,8 +126,8 @@ func main() {
}
}()

atpClient := atp.NewClient(plugin)
fmt.Println("Getting schema")
atpClient := atp.NewClientWithLogger(plugin, logger)
logger.Infof("Getting schema")
pluginSchema, err := atpClient.ReadSchema()
if err != nil {
panic(err)
Expand All @@ -120,15 +149,22 @@ func main() {
panic(err)
}
fmt.Printf("Running step %s\n", stepID)
outputID, outputData, err := atpClient.Execute(stepID, input)
result := atpClient.Execute(
schema.Input{RunID: runID, ID: stepID, InputData: input},
toStepSignals,
nil,
)
if err := atpClient.Close(); err != nil {
fmt.Printf("Error closing ATP client: %s", err)
}
output := map[string]any{
"outputID": outputID,
"outputData": outputData,
"err": err,
"outputID": result.OutputID,
"outputData": result.OutputData,
"err": result.Error,
}
result, err := yaml.Marshal(output)
resultStr, err := yaml.Marshal(output)
if err != nil {
panic(err)
}
fmt.Printf("%s", result)
fmt.Printf("%s", resultStr)
}
4 changes: 2 additions & 2 deletions engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ input:
type_id: string
steps:
example:
plugin: quay.io/arcalot/arcaflow-plugin-template-python:0.1.0
plugin: quay.io/arcalot/arcaflow-plugin-template-python:0.2.1
input:
name: !expr $.input.name
output:
Expand Down Expand Up @@ -174,7 +174,7 @@ input:
type_id: string
steps:
example:
plugin: quay.io/arcalot/arcaflow-plugin-template-python:0.1.0
plugin: quay.io/arcalot/arcaflow-plugin-template-python:0.2.1
input:
name: !expr $.input.name
outputs:
Expand Down
30 changes: 16 additions & 14 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,32 @@ module go.flow.arcalot.io/engine
go 1.18

require (
go.arcalot.io/assert v1.3.0
go.arcalot.io/assert v1.6.0
go.arcalot.io/dgraph v1.1.0
go.arcalot.io/lang v1.0.0
go.arcalot.io/log/v2 v2.0.0
go.flow.arcalot.io/deployer v0.2.0
go.flow.arcalot.io/dockerdeployer v0.3.0
go.flow.arcalot.io/expressions v0.2.0
go.flow.arcalot.io/kubernetesdeployer v0.5.1
go.flow.arcalot.io/pluginsdk v0.3.0-beta.1
go.flow.arcalot.io/podmandeployer v0.3.1
go.flow.arcalot.io/pythondeployer v0.1.3
go.flow.arcalot.io/testdeployer v0.2.0
go.flow.arcalot.io/deployer v0.3.0
go.flow.arcalot.io/dockerdeployer v0.4.0
go.flow.arcalot.io/expressions v0.2.1
go.flow.arcalot.io/kubernetesdeployer v0.7.0
go.flow.arcalot.io/pluginsdk v0.5.0
go.flow.arcalot.io/podmandeployer v0.5.0
go.flow.arcalot.io/pythondeployer v0.3.0
go.flow.arcalot.io/testdeployer v0.3.0
gopkg.in/yaml.v3 v3.0.1
)

require (
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/docker/distribution v2.8.2+incompatible // indirect
github.com/docker/docker v24.0.5+incompatible // indirect
github.com/distribution/reference v0.5.0 // indirect
github.com/docker/distribution v2.8.3+incompatible // indirect
github.com/docker/docker v24.0.6+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/emicklei/go-restful/v3 v3.10.0 // indirect
github.com/fxamacker/cbor/v2 v2.4.0 // indirect
github.com/fxamacker/cbor/v2 v2.5.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
Expand All @@ -47,15 +48,16 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.flow.arcalot.io/testplugin v0.1.0 // indirect
go.flow.arcalot.io/testplugin v0.2.1 // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/net v0.15.0 // indirect
golang.org/x/oauth2 v0.2.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/term v0.12.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.2.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.13.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.28.1 // indirect
Expand Down
Loading

0 comments on commit b292124

Please sign in to comment.