From 0338e071fd2a917f3e3e511a7a01e65ec3632b44 Mon Sep 17 00:00:00 2001 From: Matt Leader Date: Thu, 18 Jan 2024 16:38:52 -0500 Subject: [PATCH] add file context (#142) * add file context * remove dead code * fix * refactor more * code to load subworkflow into cache * remove subworkflow commit * add tests * remove dead code * remove more dead code * add function * move interface comment * add merge file cache test * add assertion for abspath by key * changes * fixes * add exit code for missing config file key * fix * fix lint * add main func for test directory cleanup * remove superfluous code --- cmd/arcaflow/main.go | 38 ++++----- engine.go | 15 ++-- engine_test.go | 80 ++++++++++------- loadfile/loadfile.go | 175 ++++++++++++++++++++++++++++++++------ loadfile/loadfile_test.go | 136 ++++++++++++++++++++++------- 5 files changed, 327 insertions(+), 117 deletions(-) diff --git a/cmd/arcaflow/main.go b/cmd/arcaflow/main.go index 9be3b58e..fe05d062 100644 --- a/cmd/arcaflow/main.go +++ b/cmd/arcaflow/main.go @@ -130,22 +130,28 @@ Options: RequiredFileKeyWorkflow: workflowFile, } - requiredFilesAbsPaths, err := loadfile.AbsPathsWithContext(dir, requiredFiles) + fileCtx, err := loadfile.NewFileCacheUsingContext(dir, requiredFiles) if err != nil { flag.Usage() tempLogger.Errorf("context path resolution failed %s (%v)", dir, err) os.Exit(ExitCodeInvalidData) } - var configData any = map[any]any{} - if configFile != "" { - configData, err = loadYamlFile(requiredFilesAbsPaths[RequiredFileKeyConfig]) - if err != nil { - tempLogger.Errorf("Failed to load configuration file %s (%v)", configFile, err) - flag.Usage() - os.Exit(ExitCodeInvalidData) - } + configFilePath, err := fileCtx.AbsPathByKey(RequiredFileKeyConfig) + if err != nil { + tempLogger.Errorf("Unable to find configuration file %s (%v)", configFile, err) + flag.Usage() + os.Exit(ExitCodeInvalidData) } + + var configData any + configData, err = loadYamlFile(configFilePath) + if err != nil { + tempLogger.Errorf("Failed to load configuration file %s (%v)", configFile, err) + flag.Usage() + os.Exit(ExitCodeInvalidData) + } + cfg, err := config.Load(configData) if err != nil { tempLogger.Errorf("Failed to load configuration file %s (%v)", configFile, err) @@ -157,13 +163,7 @@ Options: cfg.Log.Stdout = os.Stderr logger := log.New(cfg.Log).WithLabel("source", "main") - var requiredFilesAbsSlice = make([]string, len(requiredFilesAbsPaths)) - var j int - for _, f := range requiredFilesAbsPaths { - requiredFilesAbsSlice[j] = f - j++ - } - dirContext, err := loadfile.LoadContext(requiredFilesAbsSlice) + err = fileCtx.LoadContext() if err != nil { logger.Errorf("Failed to load required files into context (%v)", err) flag.Usage() @@ -187,10 +187,10 @@ Options: } } - os.Exit(runWorkflow(flow, dirContext, requiredFilesAbsPaths[RequiredFileKeyWorkflow], logger, inputData)) + os.Exit(runWorkflow(flow, fileCtx, RequiredFileKeyWorkflow, logger, inputData)) } -func runWorkflow(flow engine.WorkflowEngine, dirContext map[string][]byte, workflowFile string, logger log.Logger, inputData []byte) int { +func runWorkflow(flow engine.WorkflowEngine, fileCtx loadfile.FileCache, workflowFile string, logger log.Logger, inputData []byte) int { ctx, cancel := context.WithCancel(context.Background()) ctrlC := make(chan os.Signal, 4) // We expect up to three ctrl-C inputs. Plus one extra to buffer in case. signal.Notify(ctrlC, os.Interrupt) @@ -201,7 +201,7 @@ func runWorkflow(flow engine.WorkflowEngine, dirContext map[string][]byte, workf cancel() }() - workflow, err := flow.Parse(dirContext, workflowFile) + workflow, err := flow.Parse(fileCtx, workflowFile) if err != nil { logger.Errorf("Invalid workflow (%v)", err) return ExitCodeInvalidData diff --git a/engine.go b/engine.go index 0411a9dc..959bcc3d 100644 --- a/engine.go +++ b/engine.go @@ -8,6 +8,7 @@ import ( "go.flow.arcalot.io/engine/config" "go.flow.arcalot.io/engine/internal/step" "go.flow.arcalot.io/engine/internal/yaml" + "go.flow.arcalot.io/engine/loadfile" "go.flow.arcalot.io/engine/workflow" "go.flow.arcalot.io/pluginsdk/schema" ) @@ -22,14 +23,14 @@ type WorkflowEngine interface { RunWorkflow( ctx context.Context, input []byte, - workflowContext map[string][]byte, + workflowContext loadfile.FileCache, workflowFileName string, ) (outputID string, outputData any, outputError bool, err error) // Parse ingests a workflow context as a map of files to their contents and a workflow file name and // parses the data into an executable workflow. Parse( - workflowContext map[string][]byte, + workflowContext loadfile.FileCache, workflowFileName string, ) ( workflow Workflow, @@ -65,7 +66,7 @@ type workflowEngine struct { func (w workflowEngine) RunWorkflow( ctx context.Context, input []byte, - workflowContext map[string][]byte, + workflowContext loadfile.FileCache, workflowFileName string, ) (outputID string, outputData any, outputError bool, err error) { wf, err := w.Parse(workflowContext, workflowFileName) @@ -76,14 +77,14 @@ func (w workflowEngine) RunWorkflow( } func (w workflowEngine) Parse( - files map[string][]byte, + files loadfile.FileCache, workflowFileName string, ) (Workflow, error) { if workflowFileName == "" { workflowFileName = "workflow.yaml" } - workflowContents, ok := files[workflowFileName] - if !ok { + workflowContents, err := files.ContentByKey(workflowFileName) + if err != nil { return nil, ErrNoWorkflowFile } @@ -104,7 +105,7 @@ func (w workflowEngine) Parse( return nil, err } - preparedWorkflow, err := executor.Prepare(wf, files) + preparedWorkflow, err := executor.Prepare(wf, files.Contents()) if err != nil { return nil, err } diff --git a/engine_test.go b/engine_test.go index be8dd1f0..591174d6 100644 --- a/engine_test.go +++ b/engine_test.go @@ -3,6 +3,7 @@ package engine_test import ( "context" "errors" + "go.flow.arcalot.io/engine/loadfile" "testing" log "go.arcalot.io/log/v2" @@ -35,10 +36,11 @@ func createTestEngine(t *testing.T) engine.WorkflowEngine { } func TestNoWorkflowFile(t *testing.T) { + fileCache := loadfile.NewFileCache("", map[string][]byte{}) _, _, outputError, err := createTestEngine(t).RunWorkflow( context.Background(), nil, - map[string][]byte{}, + fileCache, "", ) assert.Error(t, err) @@ -49,12 +51,14 @@ func TestNoWorkflowFile(t *testing.T) { } func TestEmptyWorkflowFile(t *testing.T) { + fileCache := loadfile.NewFileCache("", + map[string][]byte{ + "workflow.yaml": {}, + }) _, _, outputError, err := createTestEngine(t).RunWorkflow( context.Background(), nil, - map[string][]byte{ - "workflow.yaml": {}, - }, + fileCache, "", ) assert.Error(t, err) @@ -65,13 +69,15 @@ func TestEmptyWorkflowFile(t *testing.T) { } func TestInvalidYAML(t *testing.T) { + content := map[string][]byte{ + "workflow.yaml": []byte(`: foo + bar`), + } + fileCache := loadfile.NewFileCache("", content) _, _, outputError, err := createTestEngine(t).RunWorkflow( context.Background(), nil, - map[string][]byte{ - "workflow.yaml": []byte(`: foo - bar`), - }, + fileCache, "", ) assert.Error(t, err) @@ -83,12 +89,14 @@ func TestInvalidYAML(t *testing.T) { } func TestInvalidWorkflow(t *testing.T) { + content := map[string][]byte{ + "workflow.yaml": []byte(`test: Hello world!`), + } + fileCache := loadfile.NewFileCache("", content) _, _, outputError, err := createTestEngine(t).RunWorkflow( context.Background(), nil, - map[string][]byte{ - "workflow.yaml": []byte(`test: Hello world!`), - }, + fileCache, "", ) assert.Error(t, err) @@ -100,14 +108,16 @@ func TestInvalidWorkflow(t *testing.T) { } func TestEmptySteps(t *testing.T) { + content := map[string][]byte{ + "workflow.yaml": []byte(`version: v0.2.0 +output: [] +steps: []`), + } + fileCache := loadfile.NewFileCache("", content) _, _, outputError, err := createTestEngine(t).RunWorkflow( context.Background(), nil, - map[string][]byte{ - "workflow.yaml": []byte(`version: v0.2.0 -output: [] -steps: []`), - }, + fileCache, "", ) assert.Error(t, err) @@ -115,13 +125,15 @@ steps: []`), } func TestNoSteps(t *testing.T) { + content := map[string][]byte{ + "workflow.yaml": []byte(`version: v0.2.0 +output: []`), + } + fileCache := loadfile.NewFileCache("", content) _, _, outputError, err := createTestEngine(t).RunWorkflow( context.Background(), nil, - map[string][]byte{ - "workflow.yaml": []byte(`version: v0.2.0 -output: []`), - }, + fileCache, "", ) assert.Error(t, err) @@ -129,11 +141,8 @@ output: []`), } func TestE2E(t *testing.T) { - outputID, outputData, outputError, err := createTestEngine(t).RunWorkflow( - context.Background(), - []byte(`name: Arca Lot`), - map[string][]byte{ - "workflow.yaml": []byte(`version: v0.2.0 + content := map[string][]byte{ + "workflow.yaml": []byte(`version: v0.2.0 input: root: RootObject objects: @@ -152,7 +161,12 @@ steps: name: !expr $.input.name output: message: !expr $.steps.example.outputs.success.message`), - }, + } + fileCache := loadfile.NewFileCache("", content) + outputID, outputData, outputError, err := createTestEngine(t).RunWorkflow( + context.Background(), + []byte(`name: Arca Lot`), + fileCache, "", ) assert.NoError(t, err) @@ -162,11 +176,8 @@ output: } func TestE2EMultipleOutputs(t *testing.T) { - outputID, outputData, outputError, err := createTestEngine(t).RunWorkflow( - context.Background(), - []byte(`name: Arca Lot`), - map[string][]byte{ - "workflow.yaml": []byte(`version: v0.2.0 + content := map[string][]byte{ + "workflow.yaml": []byte(`version: v0.2.0 input: root: RootObject objects: @@ -186,7 +197,12 @@ steps: outputs: success: message: !expr $.steps.example.outputs.success.message`), - }, + } + fileCache := loadfile.NewFileCache("", content) + outputID, outputData, outputError, err := createTestEngine(t).RunWorkflow( + context.Background(), + []byte(`name: Arca Lot`), + fileCache, "", ) assert.NoError(t, err) diff --git a/loadfile/loadfile.go b/loadfile/loadfile.go index 242b5188..1c8cfe9d 100644 --- a/loadfile/loadfile.go +++ b/loadfile/loadfile.go @@ -7,41 +7,164 @@ import ( "path/filepath" ) -// LoadContext reads the content of each file into a map where the key -// is the absolute filepath and the file content is the value. -func LoadContext(neededFilepaths []string) (map[string][]byte, error) { - result := map[string][]byte{} - var err error - for _, filePath := range neededFilepaths { - absPath, err := filepath.Abs(filePath) - if err != nil { - return nil, fmt.Errorf("error obtaining absolute path of file %s (%w)", - filePath, err) - } - fileData, err := os.ReadFile(filepath.Clean(absPath)) - if err != nil { - return nil, fmt.Errorf("error reading file %s (%w)", absPath, err) - } - result[absPath] = fileData - } - return result, err +// ContextFile is a file whose absolute file path and content +// need to be referenced at some point during execution. +type ContextFile struct { + ID string + AbsolutePath string + Content []byte +} + +type fileCache struct { + // Root directory for given relative filepaths + rootDir string + // Mapping of unique keys that reference a file needed for execution + files map[string]ContextFile } -// AbsPathsWithContext creates a map of absolute filepaths. If a required -// file is not provided with an absolute path, then it is joined with the -// root directory. -func AbsPathsWithContext(rootDir string, requiredFiles map[string]string) (map[string]string, error) { +// FileCache is a container of ContextFiles, and a context (root) +// directory path to be used in conjunction with files that do not +// provide an absolute file path. +type FileCache interface { + RootDir() string + LoadContext() error + GetByKey(fileKey string) (*ContextFile, error) + AbsPathByKey(fileKey string) (string, error) + ContentByKey(fileKey string) ([]byte, error) + Contents() map[string][]byte + Files() map[string]ContextFile +} + +// NewFileCacheUsingContext returns a FileCache object containing the provided root directory string and a +// new map based on the provided map. The new map uses the same keys as the input but their corresponding +// values contain absolute file paths. Relative file paths provided in the input map are prefixed with the root +// directory before being included in the output map. +func NewFileCacheUsingContext(rootDir string, files map[string]string) (FileCache, error) { absDir, err := filepath.Abs(rootDir) if err != nil { return nil, fmt.Errorf("error determining context directory absolute path %s (%w)", rootDir, err) } - requiredFilesAbs := map[string]string{} - for key, f := range requiredFiles { + filesAbsPaths := map[string]ContextFile{} + for key, f := range files { abspath := f if !filepath.IsAbs(f) { abspath = filepath.Join(absDir, f) } - requiredFilesAbs[key] = abspath + filesAbsPaths[key] = ContextFile{ + ID: f, + AbsolutePath: abspath, + } + } + return &fileCache{ + rootDir: absDir, files: filesAbsPaths}, nil +} + +// NewFileCache returns a FileCache object containing provided +// root directory string and a new map based on the provided +// map. The new map uses the same keys as the input and the +// corresponding values include the contents from the input; +// the key value is also used as the ID and the absolute path for +// the entries. +func NewFileCache(rootDir string, fileContents map[string][]byte) FileCache { + files := map[string]ContextFile{} + for key, content := range fileContents { + files[key] = ContextFile{ + ID: key, + AbsolutePath: key, + Content: content, + } + } + return &fileCache{ + rootDir: rootDir, + files: files, + } +} + +// LoadContext reads the content of each context file into Content. +func (fc *fileCache) LoadContext() error { + result := map[string]ContextFile{} + var err error + for key, cf := range fc.files { + absPath := cf.AbsolutePath + fileData, err := os.ReadFile(filepath.Clean(absPath)) + if err != nil { + return fmt.Errorf("error reading file %s (%w)", absPath, err) + } + result[key] = ContextFile{ + ID: cf.ID, + AbsolutePath: cf.AbsolutePath, + Content: fileData, + } + } + fc.files = result + return err +} + +// RootDir returns the root directory used by files with +// relative file paths. +func (fc *fileCache) RootDir() string { + return fc.rootDir +} + +// GetByKey asks if the file cache contains the given fileKey. +func (fc *fileCache) GetByKey(fileKey string) (*ContextFile, error) { + cf, ok := fc.files[fileKey] + if !ok { + return nil, fmt.Errorf("file cache does not contain %q", fileKey) + } + return &cf, nil +} + +// AbsPathByKey returns the absolute file path of a given file key, +// if it exists in the file cache, nil otherwise. +func (fc *fileCache) AbsPathByKey(fileKey string) (string, error) { + cf, err := fc.GetByKey(fileKey) + if err != nil { + return "", err + } + return cf.AbsolutePath, nil +} + +// ContentByKey returns the file content of a given file key, if it +// exists in the file cache, nil otherwise. +func (fc *fileCache) ContentByKey(fileKey string) ([]byte, error) { + cf, err := fc.GetByKey(fileKey) + if err != nil { + return nil, err + } + return cf.Content, nil +} + +// Contents returns a mapping of the file cache's file keys to file Content. +func (fc *fileCache) Contents() map[string][]byte { + result := map[string][]byte{} + for key, f := range fc.files { + result[key] = f.Content + } + return result +} + +// Files returns the mapping of file keys to their ContextFile. +func (fc *fileCache) Files() map[string]ContextFile { + return fc.files +} + +// MergeFileCaches merges any number of file caches into one file cache. +// The new root directory will be the root directory of the last file cache +// argument. File keys found later in iteration will overwrite previously +// file keys, if there is a name clash. +func MergeFileCaches(fileCaches ...FileCache) FileCache { + cache := map[string]ContextFile{} + rootDir := "" + + for _, fc := range fileCaches { + for key, contextFile := range fc.Files() { + cache[key] = contextFile + } + rootDir = fc.RootDir() + } + return &fileCache{ + rootDir: rootDir, + files: cache, } - return requiredFilesAbs, nil } diff --git a/loadfile/loadfile_test.go b/loadfile/loadfile_test.go index e5c7dd32..98d59426 100644 --- a/loadfile/loadfile_test.go +++ b/loadfile/loadfile_test.go @@ -3,6 +3,7 @@ package loadfile_test import ( "go.arcalot.io/assert" "go.flow.arcalot.io/engine/loadfile" + "log" "os" "path/filepath" "testing" @@ -15,11 +16,8 @@ import ( // engine should only attempt to read files of a type that the os can // read (i.e. not throw an error on a call to os.ReadFile()), and // disregard (not throw an error) files with a type it cannot read. -func TestLoadContext(t *testing.T) { - testdir := "/tmp/loadfile-test" - // cleanup directory even if it's there - _ = os.RemoveAll(testdir) - +func Test_LoadContext(t *testing.T) { + testdir := filepath.Join(TestDir, "load-ctx") assert.NoError(t, os.MkdirAll(testdir, os.ModePerm)) // create a directory @@ -43,52 +41,52 @@ func TestLoadContext(t *testing.T) { symlinkFilepath := filePath + "_sym" assert.NoError(t, os.Symlink(filePath, symlinkFilepath)) - neededFiles := []string{ - filePath, - symlinkFilepath, + neededFiles := map[string]string{ + filePath: filePath, + symlinkFilepath: symlinkFilepath, } - filemap, err := loadfile.LoadContext(neededFiles) + fc, err := loadfile.NewFileCacheUsingContext(testdir, neededFiles) // assert no error on attempting to read files // that cannot be read assert.NoError(t, err) + err = fc.LoadContext() + assert.NoError(t, err) // assert only the regular and symlinked file are loaded filemapExp := map[string][]byte{ filePath: {}, symlinkFilepath: {}, } - assert.Equals(t, filemap, filemapExp) + assert.Equals(t, fc.Contents(), filemapExp) + errFileRead := "reading file" // error on loading a directory - neededFiles = []string{ - dirpath, + neededFiles = map[string]string{ + dirpath: dirpath, } - - errFileRead := "reading file" - ctxFiles, err := loadfile.LoadContext(neededFiles) + fc, err = loadfile.NewFileCacheUsingContext(testdir, neededFiles) + assert.NoError(t, err) + err = fc.LoadContext() assert.Error(t, err) assert.Contains(t, err.Error(), errFileRead) - assert.Nil(t, ctxFiles) // error on loading a symlink directory - neededFiles = []string{ - symlinkDirpath, + neededFiles = map[string]string{ + symlinkDirpath: symlinkDirpath, } - ctxFiles, err = loadfile.LoadContext(neededFiles) + fc, err = loadfile.NewFileCacheUsingContext(testdir, neededFiles) + assert.NoError(t, err) + err = fc.LoadContext() assert.Error(t, err) assert.Contains(t, err.Error(), errFileRead) - assert.Nil(t, ctxFiles) - - t.Cleanup(func() { - assert.NoError(t, os.RemoveAll(testdir)) - }) } -// This tests AbsPathsWithContext which joins relative paths -// with the context (root) directory, and passes through -// absolute paths unmodified. -func TestContextAbsFilepaths(t *testing.T) { - testdir, err := os.MkdirTemp(os.TempDir(), "") +// This tests the construction of a new file cache, and the +// determination of the absolute file paths. The file cache +// joins relative paths with the context (root) directory, +// and passes through absolute paths unmodified. +func Test_NewFileCache(t *testing.T) { + testdir, err := os.MkdirTemp(TestDir, "") assert.NoError(t, err) testFilepaths := map[string]string{ @@ -103,11 +101,83 @@ func TestContextAbsFilepaths(t *testing.T) { "c": filepath.Join(testdir, testFilepaths["c"]), } - absPathsGot, err := loadfile.AbsPathsWithContext(testdir, testFilepaths) + fc, err := loadfile.NewFileCacheUsingContext(testdir, testFilepaths) assert.NoError(t, err) + absPathsGot := FileCacheAbsPaths(fc) assert.Equals(t, absPathsExp, absPathsGot) + // test file key not in file cache returns nil + _, err = fc.AbsPathByKey("") + assert.Error(t, err) + assert.Contains(t, err.Error(), "file cache does not contain") +} + +// This tests that the merge file cache combines the contents +// of two file caches appropriately, where non-unique file keys +// are overwritten by a later file cache argument. +func Test_MergeFileCaches(t *testing.T) { + content := []byte(`content`) + replacedContent := []byte(`replaced content`) + content1 := []byte(`content1`) + content2 := []byte(`content2`) + commonName := "robot" + filename1 := "b" + filename2 := "c" + + files1 := map[string][]byte{ + filename1: content1, + commonName: content, + } + rootDir1 := "1" + fc1 := loadfile.NewFileCache(rootDir1, files1) + + files2 := map[string][]byte{ + commonName: replacedContent, + filename2: content2, + } + expRootDir := "2" + fc2 := loadfile.NewFileCache(expRootDir, files2) + + expMergedFiles := map[string]loadfile.ContextFile{ + filename1: { + ID: filename1, + AbsolutePath: filename1, + Content: content1, + }, + filename2: { + ID: filename2, + AbsolutePath: filename2, + Content: content2, + }, + commonName: { + ID: commonName, + AbsolutePath: commonName, + Content: replacedContent, + }, + } + + fcMerged := loadfile.MergeFileCaches(fc1, fc2) + assert.Equals(t, fcMerged.RootDir(), expRootDir) + assert.Equals(t, fcMerged.Files(), expMergedFiles) +} - t.Cleanup(func() { - assert.NoError(t, os.RemoveAll(testdir)) - }) +func FileCacheAbsPaths(fc loadfile.FileCache) map[string]string { + result := map[string]string{} + for key, f := range fc.Files() { + result[key] = f.AbsolutePath + } + return result +} + +var TestDir = filepath.Join(os.TempDir(), "loadfile-tests") + +func TestMain(m *testing.M) { + // cleanup directory even if it's there + _ = os.RemoveAll(TestDir) + err := os.MkdirAll(TestDir, os.ModePerm) + if err != nil { + log.Fatalf("failed to make directory %s %v", TestDir, err) + } + exitCode := m.Run() + _ = os.RemoveAll(TestDir) + os.Exit(exitCode) }