From 2ae400f23451294774f933f7ba954755d0ed5896 Mon Sep 17 00:00:00 2001 From: Jared O'Connell Date: Tue, 7 Nov 2023 17:09:34 -0500 Subject: [PATCH 1/6] Added graceful shutdown for ctrl-C --- cmd/arcaflow/main.go | 31 ++++++++++++++++++++++++++++++- engine.go | 12 ++++++++++-- 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/cmd/arcaflow/main.go b/cmd/arcaflow/main.go index 967ebf90..72c2f88d 100644 --- a/cmd/arcaflow/main.go +++ b/cmd/arcaflow/main.go @@ -6,6 +6,7 @@ import ( "flag" "fmt" "os" + "os/signal" "path/filepath" "strings" @@ -164,13 +165,21 @@ Options: func runWorkflow(flow engine.WorkflowEngine, dirContext map[string][]byte, workflowFile string, logger log.Logger, inputData []byte) int { ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctrlC := make(chan os.Signal, 1) + signal.Notify(ctrlC, os.Interrupt) + + go handleOSInterrupt(ctrlC, cancel, logger) + defer func() { + close(ctrlC) // Ensure that the goroutine exits + cancel() + }() workflow, err := flow.Parse(dirContext, workflowFile) if err != nil { logger.Errorf("Invalid workflow (%v)", err) return ExitCodeInvalidData } + outputID, outputData, outputError, err := workflow.Run(ctx, inputData) if err != nil { logger.Errorf("Workflow execution failed (%v)", err) @@ -193,6 +202,26 @@ func runWorkflow(flow engine.WorkflowEngine, dirContext map[string][]byte, workf return ExitCodeOK } +func handleOSInterrupt(ctrlC chan os.Signal, cancel context.CancelFunc, logger log.Logger) { + for i := 1; true; i++ { + sysSignal, ok := <-ctrlC + if !ok || sysSignal != os.Interrupt { + return + } + if i == 1 { + logger.Infof("Requesting graceful shutdown.") + // Request graceful shutdown + cancel() + } else if i == 2 { + logger.Warningf("Hit CTRL-C again to forcefully exit workflow without cleanup. You may need to manually delete pods or containers.") + } else { + logger.Warningf("Force exiting. You may need to manually delete pods or containers.") + // Second request. Exit now. + os.Exit(1) + } + } +} + func loadContext(dir string) (map[string][]byte, error) { absDir, err := filepath.Abs(dir) if err != nil { diff --git a/engine.go b/engine.go index d82687a3..0411a9dc 100644 --- a/engine.go +++ b/engine.go @@ -62,7 +62,12 @@ type workflowEngine struct { config *config.Config } -func (w workflowEngine) RunWorkflow(ctx context.Context, input []byte, workflowContext map[string][]byte, workflowFileName string) (outputID string, outputData any, outputError bool, err error) { +func (w workflowEngine) RunWorkflow( + ctx context.Context, + input []byte, + workflowContext map[string][]byte, + workflowFileName string, +) (outputID string, outputData any, outputError bool, err error) { wf, err := w.Parse(workflowContext, workflowFileName) if err != nil { return "", nil, true, err @@ -126,7 +131,10 @@ type engineWorkflow struct { workflow workflow.ExecutableWorkflow } -func (e engineWorkflow) Run(ctx context.Context, input []byte) (outputID string, outputData any, outputIsError bool, err error) { +func (e engineWorkflow) Run( + ctx context.Context, + input []byte, +) (outputID string, outputData any, outputIsError bool, err error) { decodedInput, err := yaml.New().Parse(input) if err != nil { return "", nil, true, fmt.Errorf("failed to YAML decode input (%w)", err) From 9166b34146bfb882f8389c3e34eca155cb785f0a Mon Sep 17 00:00:00 2001 From: Jared O'Connell Date: Tue, 7 Nov 2023 17:28:17 -0500 Subject: [PATCH 2/6] Switched to switch statement --- cmd/arcaflow/main.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cmd/arcaflow/main.go b/cmd/arcaflow/main.go index 72c2f88d..00bb92e3 100644 --- a/cmd/arcaflow/main.go +++ b/cmd/arcaflow/main.go @@ -208,13 +208,14 @@ func handleOSInterrupt(ctrlC chan os.Signal, cancel context.CancelFunc, logger l if !ok || sysSignal != os.Interrupt { return } - if i == 1 { + switch { + case i <= 1: logger.Infof("Requesting graceful shutdown.") // Request graceful shutdown cancel() - } else if i == 2 { + case i == 2: logger.Warningf("Hit CTRL-C again to forcefully exit workflow without cleanup. You may need to manually delete pods or containers.") - } else { + default: logger.Warningf("Force exiting. You may need to manually delete pods or containers.") // Second request. Exit now. os.Exit(1) From efbbf23694e9cfa40843bf28f440a82e4d755278 Mon Sep 17 00:00:00 2001 From: Jared O'Connell Date: Wed, 8 Nov 2023 11:14:33 -0500 Subject: [PATCH 3/6] Added test for context done cancellation --- workflow/workflow_test.go | 61 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/workflow/workflow_test.go b/workflow/workflow_test.go index 330b7ed0..fc38d690 100644 --- a/workflow/workflow_test.go +++ b/workflow/workflow_test.go @@ -493,3 +493,64 @@ func TestMissingInputsWrongOutput(t *testing.T) { assert.Error(t, err) assert.Equals(t, outputID, "") } + +var fiveSecWaitWorkflowDefinition = ` +version: v0.2.0 +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: {} +steps: + long_wait: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: 5000 +outputs: + success: + first_step_output: !expr $.steps.long_wait.outputs +` + +func TestEarlyContextCancellation(t *testing.T) { + // For this test, a workflow runs two steps, where each step runs a wait step for 5s + // The second wait step waits for the first to succeed after which it runs + // Due to the wait for condition, the steps will execute serially + // The total execution time for this test function should be greater than 10seconds + // as each step runs for 5s and are run serially + // The test double deployer will be used for this test, as we + // need a deployer to test the plugin step provider. + logConfig := log.Config{ + Level: log.LevelInfo, + Destination: log.DestinationStdout, + } + logger := log.New( + logConfig, + ) + cfg := &config.Config{ + Log: logConfig, + } + stepRegistry := NewTestImplStepRegistry(logger, t) + + executor := lang.Must2(workflow.NewExecutor( + logger, + cfg, + stepRegistry, + )) + wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(fiveSecWaitWorkflowDefinition))) + preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{})) + // Cancel the context after 3 ms to simulate cancellation with ctrl-c. + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*3) + startTime := time.Now() // Right before execute to not include pre-processing time. + _, _, _ = preparedWorkflow.Execute(ctx, map[string]any{}) + cancel() + + duration := time.Since(startTime) + t.Logf("Test execution time: %s", duration) + if duration >= 1000*time.Millisecond { + t.Fatalf("Test execution time is greater than 1000 milliseconds; Is the workflow properly cancelling?") + } +} From 1858a12b2a7559f3b233172f40ba574aad656968 Mon Sep 17 00:00:00 2001 From: Jared O'Connell Date: Wed, 8 Nov 2023 11:24:39 -0500 Subject: [PATCH 4/6] Ignore linting This error does not happen locally. Just on CI --- workflow/workflow_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/workflow/workflow_test.go b/workflow/workflow_test.go index fc38d690..350cca7c 100644 --- a/workflow/workflow_test.go +++ b/workflow/workflow_test.go @@ -545,6 +545,7 @@ func TestEarlyContextCancellation(t *testing.T) { // Cancel the context after 3 ms to simulate cancellation with ctrl-c. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*3) startTime := time.Now() // Right before execute to not include pre-processing time. + //nolint:dogsled _, _, _ = preparedWorkflow.Execute(ctx, map[string]any{}) cancel() From 7a45b63c83c27888986e1144e362cefd8315655f Mon Sep 17 00:00:00 2001 From: Jared O'Connell Date: Wed, 8 Nov 2023 12:27:15 -0500 Subject: [PATCH 5/6] Address review comments --- cmd/arcaflow/main.go | 35 ++++++++++++++++++----------------- cmd/run-plugin/run.go | 2 +- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/cmd/arcaflow/main.go b/cmd/arcaflow/main.go index 00bb92e3..28b76928 100644 --- a/cmd/arcaflow/main.go +++ b/cmd/arcaflow/main.go @@ -203,24 +203,25 @@ func runWorkflow(flow engine.WorkflowEngine, dirContext map[string][]byte, workf } func handleOSInterrupt(ctrlC chan os.Signal, cancel context.CancelFunc, logger log.Logger) { - for i := 1; true; i++ { - sysSignal, ok := <-ctrlC - if !ok || sysSignal != os.Interrupt { - return - } - switch { - case i <= 1: - logger.Infof("Requesting graceful shutdown.") - // Request graceful shutdown - cancel() - case i == 2: - logger.Warningf("Hit CTRL-C again to forcefully exit workflow without cleanup. You may need to manually delete pods or containers.") - default: - logger.Warningf("Force exiting. You may need to manually delete pods or containers.") - // Second request. Exit now. - os.Exit(1) - } + _, ok := <-ctrlC + if !ok { + return + } + logger.Infof("Requesting graceful shutdown.") + cancel() + + _, ok = <-ctrlC + if !ok { + return + } + logger.Warningf("Hit CTRL-C again to forcefully exit workflow without cleanup. You may need to manually delete pods or containers.") + + _, ok = <-ctrlC + if !ok { + return } + logger.Warningf("Force exiting. You may need to manually delete pods or containers.") + os.Exit(1) } func loadContext(dir string) (map[string][]byte, error) { diff --git a/cmd/run-plugin/run.go b/cmd/run-plugin/run.go index f3a19138..86e2edeb 100644 --- a/cmd/run-plugin/run.go +++ b/cmd/run-plugin/run.go @@ -91,7 +91,7 @@ func main() { if err != nil { panic(err) } - ctrlC := make(chan os.Signal, 1) + ctrlC := make(chan os.Signal, 4) // We expect up to four ctrl-C inputs. signal.Notify(ctrlC, os.Interrupt) // Set up the signal channel to send cancel signal on ctrl-c From 588f9ea392afe993bcf5ecafe92ad6bc029c7d6e Mon Sep 17 00:00:00 2001 From: Jared O'Connell Date: Wed, 8 Nov 2023 12:51:58 -0500 Subject: [PATCH 6/6] Fix wrong channel edited, and improved comments --- cmd/arcaflow/main.go | 2 +- cmd/run-plugin/run.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/arcaflow/main.go b/cmd/arcaflow/main.go index 28b76928..b87ca800 100644 --- a/cmd/arcaflow/main.go +++ b/cmd/arcaflow/main.go @@ -165,7 +165,7 @@ Options: func runWorkflow(flow engine.WorkflowEngine, dirContext map[string][]byte, workflowFile string, logger log.Logger, inputData []byte) int { ctx, cancel := context.WithCancel(context.Background()) - ctrlC := make(chan os.Signal, 1) + ctrlC := make(chan os.Signal, 4) // We expect up to three ctrl-C inputs. Plus one extra to buffer in case. signal.Notify(ctrlC, os.Interrupt) go handleOSInterrupt(ctrlC, cancel, logger) diff --git a/cmd/run-plugin/run.go b/cmd/run-plugin/run.go index 86e2edeb..7675b704 100644 --- a/cmd/run-plugin/run.go +++ b/cmd/run-plugin/run.go @@ -91,7 +91,7 @@ func main() { if err != nil { panic(err) } - ctrlC := make(chan os.Signal, 4) // We expect up to four ctrl-C inputs. + ctrlC := make(chan os.Signal, 1) // Buffer of one to properly buffer if the signal is sent early. signal.Notify(ctrlC, os.Interrupt) // Set up the signal channel to send cancel signal on ctrl-c