Skip to content

Commit

Permalink
try with workflow result
Browse files Browse the repository at this point in the history
  • Loading branch information
mfleader committed Aug 15, 2023
1 parent faaf520 commit b801630
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 44 deletions.
127 changes: 83 additions & 44 deletions cmd/arcaflow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ Options:
//signal.Notify(sigs, os.Interrupt, os.Kill)
////signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
//
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
//ctx, cancel := context.WithCancel(context.Background())
//defer cancel()
//
//logger.Infof("Starting workflow")
//var exitCode int
Expand Down Expand Up @@ -240,24 +240,21 @@ Options:
//}
//exitCode := osSignalToInt(sig)

outch := runWorkflow(ctx, flow, dirContext, workflowFile, logger, inputData)
exitCode := <-outch
defer close(outch)
exitCode := runWorkflow(flow, dirContext, workflowFile, logger, inputData)
logger.Infof("Got exit code %d", exitCode)
os.Exit(exitCode)
}

func runWorkflow(ctx context.Context, flow engine.WorkflowEngine, dirContext map[string][]byte, workflowFile string, logger log.Logger, inputData []byte) chan int {
//ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
func runWorkflow(flow engine.WorkflowEngine, dirContext map[string][]byte, workflowFile string, logger log.Logger, inputData []byte) int {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sigs := make(chan os.Signal, 1)
defer close(sigs)
signal.Notify(sigs, os.Interrupt, os.Kill)

Check failure on line 254 in cmd/arcaflow/main.go

View workflow job for this annotation

GitHub Actions / golangci-lint

SA1016: os.Kill cannot be trapped (did you mean syscall.SIGTERM?) (staticcheck)

out := make(chan int, 2)
//defer close(out)
exitcode := make(chan int, 1)
defer close(exitcode)

//var exitCode int
//exitCode := -1
Expand All @@ -269,51 +266,93 @@ func runWorkflow(ctx context.Context, flow engine.WorkflowEngine, dirContext map
logger.Infof("Caught signal %s", sig)
switch sig {
case os.Interrupt:
out <- ExitCodeUserInterrupt
exitcode <- ExitCodeUserInterrupt
case os.Kill:
out <- ExitCodeUserKill
exitcode <- ExitCodeUserKill
}
cancel()
//ctx.Done()
case <-ctx.Done():
// Done. No sigint.
logger.Infof("Main context done")
out <- ExitCodeOK
exitcode <- ExitCodeOK
}
}()

workflow, err := flow.Parse(dirContext, workflowFile)
if err != nil {
logger.Errorf("Invalid workflow (%v)", err)
out <- ExitCodeInvalidData
return out
}
//resultch := make(chan engine.WorkflowResult)
//defer close(resultch)

outputID, outputData, outputError, err := workflow.Run(ctx, inputData)
if err != nil {
logger.Errorf("Workflow execution failed (%v)", err)
out <- ExitCodeWorkflowFailed
return out
}
data, err := yaml.Marshal(
map[string]any{
"output_id": outputID,
"output_data": outputData,
},
)
if err != nil {
logger.Errorf("Failed to marshal output (%v)", err)
out <- ExitCodeInvalidData
return out
}
_, _ = os.Stdout.Write(data)
if outputError {
out <- ExitCodeWorkflowErrorOutput
return out
}
go func() {
workflow, err := flow.Parse(dirContext, workflowFile)
if err != nil {
logger.Errorf("Invalid workflow (%v)", err)
exitcode <- ExitCodeInvalidData
return
}

// allow workflow to finish, so that artifacts can be cleaned up
outputID, outputData, outputIsError, err := workflow.Run(ctx, inputData)
//resultch <- engine.WorkflowResult{
// OutputID: outputID,
// OutputData: outputData,
// OutputIsError: outputIsError,
// Err: err,
//}

if err != nil {
logger.Errorf("Workflow execution failed (%v)", err)
exitcode <- ExitCodeWorkflowFailed
return
}
data, err := yaml.Marshal(
map[string]any{
"output_id": outputID,
"output_data": outputData,
},
)
if err != nil {
logger.Errorf("Failed to marshal output (%v)", err)
exitcode <- ExitCodeInvalidData
return
}
_, _ = os.Stdout.Write(data)
if outputIsError {
exitcode <- ExitCodeWorkflowErrorOutput
return
}

exitcode <- ExitCodeOK
}()

//select {
//case result := <-resultch:
// if result.Err != nil {
// logger.Errorf("Workflow execution failed (%v)", result.Err)
// exitcode <- ExitCodeWorkflowFailed
// return <-exitcode
// }
// data, err := yaml.Marshal(
// map[string]any{
// "output_id": result.OutputID,
// "output_data": result.OutputData,
// },
// )
// if err != nil {
// logger.Errorf("Failed to marshal output (%v)", err)
// exitcode <- ExitCodeInvalidData
// return <-exitcode
// }
// _, _ = os.Stdout.Write(data)
// if result.OutputIsError {
// exitcode <- ExitCodeWorkflowErrorOutput
// return <-exitcode
// }
//
// exitcode <- ExitCodeOK
//case <-ctx.Done():
// // all done
//}

out <- ExitCodeOK
return out
return <-exitcode
}

func loadContext(dir string) (map[string][]byte, error) {
Expand Down
7 changes: 7 additions & 0 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ type WorkflowEngine interface {
)
}

type WorkflowResult struct {

Check warning on line 39 in engine.go

View workflow job for this annotation

GitHub Actions / golangci-lint

exported: exported type WorkflowResult should have comment or be unexported (revive)
OutputID string
OutputData any
OutputIsError bool
Err error
}

// Workflow is a runnable, queryable workflow. You can execute it, or query it for schema information.
type Workflow interface {
// Run executes the workflow with the passed, YAML-formatted input data.
Expand Down

0 comments on commit b801630

Please sign in to comment.