diff --git a/cmd/arcaflow/main.go b/cmd/arcaflow/main.go index 45537d70..d2cd71b8 100644 --- a/cmd/arcaflow/main.go +++ b/cmd/arcaflow/main.go @@ -198,8 +198,8 @@ Options: //signal.Notify(sigs, os.Interrupt, os.Kill) ////signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) // - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + //ctx, cancel := context.WithCancel(context.Background()) + //defer cancel() // //logger.Infof("Starting workflow") //var exitCode int @@ -240,24 +240,21 @@ Options: //} //exitCode := osSignalToInt(sig) - outch := runWorkflow(ctx, flow, dirContext, workflowFile, logger, inputData) - exitCode := <-outch - defer close(outch) + exitCode := runWorkflow(flow, dirContext, workflowFile, logger, inputData) logger.Infof("Got exit code %d", exitCode) os.Exit(exitCode) } -func runWorkflow(ctx context.Context, flow engine.WorkflowEngine, dirContext map[string][]byte, workflowFile string, logger log.Logger, inputData []byte) chan int { - //ctx, cancel := context.WithCancel(context.Background()) - ctx, cancel := context.WithCancel(ctx) +func runWorkflow(flow engine.WorkflowEngine, dirContext map[string][]byte, workflowFile string, logger log.Logger, inputData []byte) int { + ctx, cancel := context.WithCancel(context.Background()) defer cancel() sigs := make(chan os.Signal, 1) defer close(sigs) signal.Notify(sigs, os.Interrupt, os.Kill) - out := make(chan int, 2) - //defer close(out) + exitcode := make(chan int, 1) + defer close(exitcode) //var exitCode int //exitCode := -1 @@ -269,51 +266,93 @@ func runWorkflow(ctx context.Context, flow engine.WorkflowEngine, dirContext map logger.Infof("Caught signal %s", sig) switch sig { case os.Interrupt: - out <- ExitCodeUserInterrupt + exitcode <- ExitCodeUserInterrupt case os.Kill: - out <- ExitCodeUserKill + exitcode <- ExitCodeUserKill } cancel() - //ctx.Done() case <-ctx.Done(): // Done. No sigint. logger.Infof("Main context done") - out <- ExitCodeOK + exitcode <- ExitCodeOK } }() - workflow, err := flow.Parse(dirContext, workflowFile) - if err != nil { - logger.Errorf("Invalid workflow (%v)", err) - out <- ExitCodeInvalidData - return out - } + //resultch := make(chan engine.WorkflowResult) + //defer close(resultch) - outputID, outputData, outputError, err := workflow.Run(ctx, inputData) - if err != nil { - logger.Errorf("Workflow execution failed (%v)", err) - out <- ExitCodeWorkflowFailed - return out - } - data, err := yaml.Marshal( - map[string]any{ - "output_id": outputID, - "output_data": outputData, - }, - ) - if err != nil { - logger.Errorf("Failed to marshal output (%v)", err) - out <- ExitCodeInvalidData - return out - } - _, _ = os.Stdout.Write(data) - if outputError { - out <- ExitCodeWorkflowErrorOutput - return out - } + go func() { + workflow, err := flow.Parse(dirContext, workflowFile) + if err != nil { + logger.Errorf("Invalid workflow (%v)", err) + exitcode <- ExitCodeInvalidData + return + } + + // allow workflow to finish, so that artifacts can be cleaned up + outputID, outputData, outputIsError, err := workflow.Run(ctx, inputData) + //resultch <- engine.WorkflowResult{ + // OutputID: outputID, + // OutputData: outputData, + // OutputIsError: outputIsError, + // Err: err, + //} + + if err != nil { + logger.Errorf("Workflow execution failed (%v)", err) + exitcode <- ExitCodeWorkflowFailed + return + } + data, err := yaml.Marshal( + map[string]any{ + "output_id": outputID, + "output_data": outputData, + }, + ) + if err != nil { + logger.Errorf("Failed to marshal output (%v)", err) + exitcode <- ExitCodeInvalidData + return + } + _, _ = os.Stdout.Write(data) + if outputIsError { + exitcode <- ExitCodeWorkflowErrorOutput + return + } + + exitcode <- ExitCodeOK + }() + + //select { + //case result := <-resultch: + // if result.Err != nil { + // logger.Errorf("Workflow execution failed (%v)", result.Err) + // exitcode <- ExitCodeWorkflowFailed + // return <-exitcode + // } + // data, err := yaml.Marshal( + // map[string]any{ + // "output_id": result.OutputID, + // "output_data": result.OutputData, + // }, + // ) + // if err != nil { + // logger.Errorf("Failed to marshal output (%v)", err) + // exitcode <- ExitCodeInvalidData + // return <-exitcode + // } + // _, _ = os.Stdout.Write(data) + // if result.OutputIsError { + // exitcode <- ExitCodeWorkflowErrorOutput + // return <-exitcode + // } + // + // exitcode <- ExitCodeOK + //case <-ctx.Done(): + // // all done + //} - out <- ExitCodeOK - return out + return <-exitcode } func loadContext(dir string) (map[string][]byte, error) { diff --git a/engine.go b/engine.go index b5dd6f53..f1c46127 100644 --- a/engine.go +++ b/engine.go @@ -36,6 +36,13 @@ type WorkflowEngine interface { ) } +type WorkflowResult struct { + OutputID string + OutputData any + OutputIsError bool + Err error +} + // Workflow is a runnable, queryable workflow. You can execute it, or query it for schema information. type Workflow interface { // Run executes the workflow with the passed, YAML-formatted input data.