Skip to content

Commit

Permalink
add file context (#142)
Browse files Browse the repository at this point in the history
* add file context

* remove dead code

* fix

* refactor more

* code to load subworkflow into cache

* remove subworkflow commit

* add tests

* remove dead code

* remove more dead code

* add function

* move interface comment

* add merge file cache test

* add assertion for abspath by key

* changes

* fixes

* add exit code for missing config file key

* fix

* fix lint

* add main func for test directory cleanup

* remove superfluous code
  • Loading branch information
mfleader authored Jan 18, 2024
1 parent 55b6b65 commit 0338e07
Show file tree
Hide file tree
Showing 5 changed files with 327 additions and 117 deletions.
38 changes: 19 additions & 19 deletions cmd/arcaflow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,22 +130,28 @@ Options:
RequiredFileKeyWorkflow: workflowFile,
}

requiredFilesAbsPaths, err := loadfile.AbsPathsWithContext(dir, requiredFiles)
fileCtx, err := loadfile.NewFileCacheUsingContext(dir, requiredFiles)
if err != nil {
flag.Usage()
tempLogger.Errorf("context path resolution failed %s (%v)", dir, err)
os.Exit(ExitCodeInvalidData)
}

var configData any = map[any]any{}
if configFile != "" {
configData, err = loadYamlFile(requiredFilesAbsPaths[RequiredFileKeyConfig])
if err != nil {
tempLogger.Errorf("Failed to load configuration file %s (%v)", configFile, err)
flag.Usage()
os.Exit(ExitCodeInvalidData)
}
configFilePath, err := fileCtx.AbsPathByKey(RequiredFileKeyConfig)
if err != nil {
tempLogger.Errorf("Unable to find configuration file %s (%v)", configFile, err)
flag.Usage()
os.Exit(ExitCodeInvalidData)
}

var configData any
configData, err = loadYamlFile(configFilePath)
if err != nil {
tempLogger.Errorf("Failed to load configuration file %s (%v)", configFile, err)
flag.Usage()
os.Exit(ExitCodeInvalidData)
}

cfg, err := config.Load(configData)
if err != nil {
tempLogger.Errorf("Failed to load configuration file %s (%v)", configFile, err)
Expand All @@ -157,13 +163,7 @@ Options:
cfg.Log.Stdout = os.Stderr
logger := log.New(cfg.Log).WithLabel("source", "main")

var requiredFilesAbsSlice = make([]string, len(requiredFilesAbsPaths))
var j int
for _, f := range requiredFilesAbsPaths {
requiredFilesAbsSlice[j] = f
j++
}
dirContext, err := loadfile.LoadContext(requiredFilesAbsSlice)
err = fileCtx.LoadContext()
if err != nil {
logger.Errorf("Failed to load required files into context (%v)", err)
flag.Usage()
Expand All @@ -187,10 +187,10 @@ Options:
}
}

os.Exit(runWorkflow(flow, dirContext, requiredFilesAbsPaths[RequiredFileKeyWorkflow], logger, inputData))
os.Exit(runWorkflow(flow, fileCtx, RequiredFileKeyWorkflow, logger, inputData))
}

func runWorkflow(flow engine.WorkflowEngine, dirContext map[string][]byte, workflowFile string, logger log.Logger, inputData []byte) int {
func runWorkflow(flow engine.WorkflowEngine, fileCtx loadfile.FileCache, workflowFile string, logger log.Logger, inputData []byte) int {
ctx, cancel := context.WithCancel(context.Background())
ctrlC := make(chan os.Signal, 4) // We expect up to three ctrl-C inputs. Plus one extra to buffer in case.
signal.Notify(ctrlC, os.Interrupt)
Expand All @@ -201,7 +201,7 @@ func runWorkflow(flow engine.WorkflowEngine, dirContext map[string][]byte, workf
cancel()
}()

workflow, err := flow.Parse(dirContext, workflowFile)
workflow, err := flow.Parse(fileCtx, workflowFile)
if err != nil {
logger.Errorf("Invalid workflow (%v)", err)
return ExitCodeInvalidData
Expand Down
15 changes: 8 additions & 7 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"go.flow.arcalot.io/engine/config"
"go.flow.arcalot.io/engine/internal/step"
"go.flow.arcalot.io/engine/internal/yaml"
"go.flow.arcalot.io/engine/loadfile"
"go.flow.arcalot.io/engine/workflow"
"go.flow.arcalot.io/pluginsdk/schema"
)
Expand All @@ -22,14 +23,14 @@ type WorkflowEngine interface {
RunWorkflow(
ctx context.Context,
input []byte,
workflowContext map[string][]byte,
workflowContext loadfile.FileCache,
workflowFileName string,
) (outputID string, outputData any, outputError bool, err error)

// Parse ingests a workflow context as a map of files to their contents and a workflow file name and
// parses the data into an executable workflow.
Parse(
workflowContext map[string][]byte,
workflowContext loadfile.FileCache,
workflowFileName string,
) (
workflow Workflow,
Expand Down Expand Up @@ -65,7 +66,7 @@ type workflowEngine struct {
func (w workflowEngine) RunWorkflow(
ctx context.Context,
input []byte,
workflowContext map[string][]byte,
workflowContext loadfile.FileCache,
workflowFileName string,
) (outputID string, outputData any, outputError bool, err error) {
wf, err := w.Parse(workflowContext, workflowFileName)
Expand All @@ -76,14 +77,14 @@ func (w workflowEngine) RunWorkflow(
}

func (w workflowEngine) Parse(
files map[string][]byte,
files loadfile.FileCache,
workflowFileName string,
) (Workflow, error) {
if workflowFileName == "" {
workflowFileName = "workflow.yaml"
}
workflowContents, ok := files[workflowFileName]
if !ok {
workflowContents, err := files.ContentByKey(workflowFileName)
if err != nil {
return nil, ErrNoWorkflowFile
}

Expand All @@ -104,7 +105,7 @@ func (w workflowEngine) Parse(
return nil, err
}

preparedWorkflow, err := executor.Prepare(wf, files)
preparedWorkflow, err := executor.Prepare(wf, files.Contents())
if err != nil {
return nil, err
}
Expand Down
80 changes: 48 additions & 32 deletions engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package engine_test
import (
"context"
"errors"
"go.flow.arcalot.io/engine/loadfile"
"testing"

log "go.arcalot.io/log/v2"
Expand Down Expand Up @@ -35,10 +36,11 @@ func createTestEngine(t *testing.T) engine.WorkflowEngine {
}

func TestNoWorkflowFile(t *testing.T) {
fileCache := loadfile.NewFileCache("", map[string][]byte{})
_, _, outputError, err := createTestEngine(t).RunWorkflow(
context.Background(),
nil,
map[string][]byte{},
fileCache,
"",
)
assert.Error(t, err)
Expand All @@ -49,12 +51,14 @@ func TestNoWorkflowFile(t *testing.T) {
}

func TestEmptyWorkflowFile(t *testing.T) {
fileCache := loadfile.NewFileCache("",
map[string][]byte{
"workflow.yaml": {},
})
_, _, outputError, err := createTestEngine(t).RunWorkflow(
context.Background(),
nil,
map[string][]byte{
"workflow.yaml": {},
},
fileCache,
"",
)
assert.Error(t, err)
Expand All @@ -65,13 +69,15 @@ func TestEmptyWorkflowFile(t *testing.T) {
}

func TestInvalidYAML(t *testing.T) {
content := map[string][]byte{
"workflow.yaml": []byte(`: foo
bar`),
}
fileCache := loadfile.NewFileCache("", content)
_, _, outputError, err := createTestEngine(t).RunWorkflow(
context.Background(),
nil,
map[string][]byte{
"workflow.yaml": []byte(`: foo
bar`),
},
fileCache,
"",
)
assert.Error(t, err)
Expand All @@ -83,12 +89,14 @@ func TestInvalidYAML(t *testing.T) {
}

func TestInvalidWorkflow(t *testing.T) {
content := map[string][]byte{
"workflow.yaml": []byte(`test: Hello world!`),
}
fileCache := loadfile.NewFileCache("", content)
_, _, outputError, err := createTestEngine(t).RunWorkflow(
context.Background(),
nil,
map[string][]byte{
"workflow.yaml": []byte(`test: Hello world!`),
},
fileCache,
"",
)
assert.Error(t, err)
Expand All @@ -100,40 +108,41 @@ func TestInvalidWorkflow(t *testing.T) {
}

func TestEmptySteps(t *testing.T) {
content := map[string][]byte{
"workflow.yaml": []byte(`version: v0.2.0
output: []
steps: []`),
}
fileCache := loadfile.NewFileCache("", content)
_, _, outputError, err := createTestEngine(t).RunWorkflow(
context.Background(),
nil,
map[string][]byte{
"workflow.yaml": []byte(`version: v0.2.0
output: []
steps: []`),
},
fileCache,
"",
)
assert.Error(t, err)
assert.Equals(t, outputError, true)
}

func TestNoSteps(t *testing.T) {
content := map[string][]byte{
"workflow.yaml": []byte(`version: v0.2.0
output: []`),
}
fileCache := loadfile.NewFileCache("", content)
_, _, outputError, err := createTestEngine(t).RunWorkflow(
context.Background(),
nil,
map[string][]byte{
"workflow.yaml": []byte(`version: v0.2.0
output: []`),
},
fileCache,
"",
)
assert.Error(t, err)
assert.Equals(t, outputError, true)
}

func TestE2E(t *testing.T) {
outputID, outputData, outputError, err := createTestEngine(t).RunWorkflow(
context.Background(),
[]byte(`name: Arca Lot`),
map[string][]byte{
"workflow.yaml": []byte(`version: v0.2.0
content := map[string][]byte{
"workflow.yaml": []byte(`version: v0.2.0
input:
root: RootObject
objects:
Expand All @@ -152,7 +161,12 @@ steps:
name: !expr $.input.name
output:
message: !expr $.steps.example.outputs.success.message`),
},
}
fileCache := loadfile.NewFileCache("", content)
outputID, outputData, outputError, err := createTestEngine(t).RunWorkflow(
context.Background(),
[]byte(`name: Arca Lot`),
fileCache,
"",
)
assert.NoError(t, err)
Expand All @@ -162,11 +176,8 @@ output:
}

func TestE2EMultipleOutputs(t *testing.T) {
outputID, outputData, outputError, err := createTestEngine(t).RunWorkflow(
context.Background(),
[]byte(`name: Arca Lot`),
map[string][]byte{
"workflow.yaml": []byte(`version: v0.2.0
content := map[string][]byte{
"workflow.yaml": []byte(`version: v0.2.0
input:
root: RootObject
objects:
Expand All @@ -186,7 +197,12 @@ steps:
outputs:
success:
message: !expr $.steps.example.outputs.success.message`),
},
}
fileCache := loadfile.NewFileCache("", content)
outputID, outputData, outputError, err := createTestEngine(t).RunWorkflow(
context.Background(),
[]byte(`name: Arca Lot`),
fileCache,
"",
)
assert.NoError(t, err)
Expand Down
Loading

0 comments on commit 0338e07

Please sign in to comment.