Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ATP v3, Validate Compatibility, Improved stop_if, better logs, retry fix #114

Merged
merged 27 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c6fc1cd
Added workflow pre-validation
jaredoconnell Aug 4, 2023
5e5659a
Updated go mod/sum
jaredoconnell Aug 4, 2023
25fa40e
Progress towards debugging deadlock detection
jaredoconnell Sep 6, 2023
8188317
Use image that supports ARM64
jaredoconnell Sep 14, 2023
e715add
Ensure that run finishes before closing the step
jaredoconnell Sep 14, 2023
ac0ef7e
Added deadlock check retries
jaredoconnell Sep 14, 2023
c5696ec
Update test cases to take less time
jaredoconnell Sep 14, 2023
4c72b39
Update dependencies
jaredoconnell Sep 14, 2023
54503ae
Update go SDK to dev branch
jaredoconnell Sep 14, 2023
9ad0570
Updated test deployer to dev branch
jaredoconnell Sep 14, 2023
1770f19
Updated deployers, and added python deployer to run-plugin
jaredoconnell Sep 18, 2023
2fe30be
Label loggers
jaredoconnell Sep 18, 2023
cb57e6c
Update to use ATP v3
jaredoconnell Oct 20, 2023
30b9c23
Update SDK in go mod, and add force stop to step interface
jaredoconnell Oct 20, 2023
9088d54
Merge branch 'main' into atp-v2
jaredoconnell Oct 20, 2023
4f87b19
Cleanup
jaredoconnell Oct 20, 2023
e1768b5
Fix tests
jaredoconnell Oct 20, 2023
f165777
Update SDK and test deployer
jaredoconnell Oct 24, 2023
d8c64bc
Reduce redundancy in Plugin Provider
jaredoconnell Oct 24, 2023
ce8c752
Refactoring to fix linting errors
jaredoconnell Oct 24, 2023
ec46f7b
Fix linting errors
jaredoconnell Oct 24, 2023
58e5bf0
bump internal dependencies
dustinblack Oct 25, 2023
3fe61d5
update all internal dependencies
dustinblack Oct 25, 2023
1e4ac9c
Remove fragile test
jaredoconnell Oct 25, 2023
628f550
Merge remote-tracking branch 'origin/atp-v2' into atp-v2
jaredoconnell Oct 25, 2023
770c92d
Merge branch 'main' into atp-v2
dustinblack Oct 26, 2023
9fb46b7
bump python deployer to 0.3.0
dustinblack Oct 26, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/arcaflow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ Options:
}
cfg.Log.Stdout = os.Stderr

logger := log.New(cfg.Log)
logger := log.New(cfg.Log).WithLabel("source", "main")

dirContext, err := loadContext(dir)
if err != nil {
Expand Down
64 changes: 50 additions & 14 deletions cmd/run-plugin/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import (
"flag"
"fmt"
"go.flow.arcalot.io/deployer"
"go.flow.arcalot.io/pluginsdk/plugin"
"go.flow.arcalot.io/pluginsdk/schema"
podman "go.flow.arcalot.io/podmandeployer"
pythondeployer "go.flow.arcalot.io/pythondeployer"
"os"
"os/signal"

Expand All @@ -21,14 +24,19 @@ func main() {
var image string
var file string
var stepID string
var pythonPath string
var d deployer.AnyConnectorFactory
var defaultConfig any
var deployerID = "docker"
var runID = "run"
var workingDir string

flag.StringVar(&image, "image", image, "Docker image to run")
flag.StringVar(&file, "file", file, "Input file")
flag.StringVar(&stepID, "step", stepID, "Step name")
flag.StringVar(&deployerID, "deployer", stepID, "The name of the deployer")
flag.StringVar(&pythonPath, "pythonpath", "", "Path to the Python environment")
flag.StringVar(&workingDir, "workingdir", "~/", "Path to store cloned repositories")
flag.Parse()

switch deployerID {
Expand Down Expand Up @@ -60,35 +68,56 @@ func main() {
if err != nil {
panic(err)
}
case "python":
pythonFactory := pythondeployer.NewFactory()
d = deployer.Any(pythonFactory)
configSchema := pythonFactory.ConfigurationSchema()
var err error
configInput := map[string]any{}
configInput["pythonPath"] = pythonPath
configInput["workdir"] = workingDir
defaultConfig, err = configSchema.UnserializeType(configInput)
if err != nil {
panic(err)
}
default:
panic("No deployer or invalid deployer selected. Options: docker, podman, testimpl. Select with -deployer")
panic("No deployer or invalid deployer selected. Options: docker, podman, testimpl, python. Select with -deployer")
}

connector, err := d.Create(defaultConfig, log.New(log.Config{
logger := log.New(log.Config{
Level: log.LevelDebug,
Destination: log.DestinationStdout,
}))
})
connector, err := d.Create(defaultConfig, logger)
if err != nil {
panic(err)
}
ctrlC := make(chan os.Signal, 1)
signal.Notify(ctrlC, os.Interrupt)

// Set up the signal channel to send cancel signal on ctrl-c
toStepSignals := make(chan schema.Input, 3)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
select {
case <-ctrlC:
fmt.Println("Received CTRL-C. Cancelling the context to cancel the step...")
logger.Infof("Received CTRL-C. Sending cancel signal...")
toStepSignals <- schema.Input{
RunID: runID,
ID: plugin.CancellationSignalSchema.ID(),
InputData: make(map[string]any),
}
logger.Infof("Signal request sent to ATP client.")
cancel()
case <-ctx.Done():
// Done here.
}
}()

fmt.Println("Deploying")
logger.Infof("Deploying")
plugin, err := connector.Deploy(ctx, image)
if err != nil {
logger.Errorf("Error while deploying: %s", err)
panic(err)
}
defer func() {
Expand All @@ -97,8 +126,8 @@ func main() {
}
}()

atpClient := atp.NewClient(plugin)
fmt.Println("Getting schema")
atpClient := atp.NewClientWithLogger(plugin, logger)
logger.Infof("Getting schema")
pluginSchema, err := atpClient.ReadSchema()
if err != nil {
panic(err)
Expand All @@ -120,15 +149,22 @@ func main() {
panic(err)
}
fmt.Printf("Running step %s\n", stepID)
outputID, outputData, err := atpClient.Execute(stepID, input)
result := atpClient.Execute(
schema.Input{RunID: runID, ID: stepID, InputData: input},
toStepSignals,
nil,
)
if err := atpClient.Close(); err != nil {
fmt.Printf("Error closing ATP client: %s", err)
}
output := map[string]any{
"outputID": outputID,
"outputData": outputData,
"err": err,
"outputID": result.OutputID,
"outputData": result.OutputData,
"err": result.Error,
}
result, err := yaml.Marshal(output)
resultStr, err := yaml.Marshal(output)
if err != nil {
panic(err)
}
fmt.Printf("%s", result)
fmt.Printf("%s", resultStr)
}
4 changes: 2 additions & 2 deletions engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ input:
type_id: string
steps:
example:
plugin: quay.io/arcalot/arcaflow-plugin-template-python:0.1.0
plugin: quay.io/arcalot/arcaflow-plugin-template-python:0.2.1
input:
name: !expr $.input.name
output:
Expand Down Expand Up @@ -174,7 +174,7 @@ input:
type_id: string
steps:
example:
plugin: quay.io/arcalot/arcaflow-plugin-template-python:0.1.0
plugin: quay.io/arcalot/arcaflow-plugin-template-python:0.2.1
input:
name: !expr $.input.name
outputs:
Expand Down
30 changes: 16 additions & 14 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,32 @@ module go.flow.arcalot.io/engine
go 1.18

require (
go.arcalot.io/assert v1.3.0
go.arcalot.io/assert v1.6.0
go.arcalot.io/dgraph v1.1.0
go.arcalot.io/lang v1.0.0
go.arcalot.io/log/v2 v2.0.0
go.flow.arcalot.io/deployer v0.2.0
go.flow.arcalot.io/dockerdeployer v0.3.0
go.flow.arcalot.io/expressions v0.2.0
go.flow.arcalot.io/kubernetesdeployer v0.5.1
go.flow.arcalot.io/pluginsdk v0.3.0-beta.1
go.flow.arcalot.io/podmandeployer v0.3.1
go.flow.arcalot.io/pythondeployer v0.1.3
go.flow.arcalot.io/testdeployer v0.2.0
go.flow.arcalot.io/deployer v0.3.0
go.flow.arcalot.io/dockerdeployer v0.4.0
go.flow.arcalot.io/expressions v0.2.1
go.flow.arcalot.io/kubernetesdeployer v0.7.0
go.flow.arcalot.io/pluginsdk v0.5.0
go.flow.arcalot.io/podmandeployer v0.5.0
go.flow.arcalot.io/pythondeployer v0.3.0
go.flow.arcalot.io/testdeployer v0.3.0
gopkg.in/yaml.v3 v3.0.1
)

require (
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/docker/distribution v2.8.2+incompatible // indirect
github.com/docker/docker v24.0.5+incompatible // indirect
github.com/distribution/reference v0.5.0 // indirect
github.com/docker/distribution v2.8.3+incompatible // indirect
github.com/docker/docker v24.0.6+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/emicklei/go-restful/v3 v3.10.0 // indirect
github.com/fxamacker/cbor/v2 v2.4.0 // indirect
github.com/fxamacker/cbor/v2 v2.5.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
Expand All @@ -47,15 +48,16 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.flow.arcalot.io/testplugin v0.1.0 // indirect
go.flow.arcalot.io/testplugin v0.2.1 // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/net v0.15.0 // indirect
golang.org/x/oauth2 v0.2.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/term v0.12.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.2.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.13.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.28.1 // indirect
Expand Down
Loading