From bb4ab63abb482808629949de61cf444a25f94436 Mon Sep 17 00:00:00 2001 From: Yota Hamada Date: Fri, 3 Jan 2025 12:36:22 +0900 Subject: [PATCH 1/9] digraph: add new condition field --- internal/digraph/builder.go | 76 +++++++++++++++----- internal/digraph/condition.go | 5 +- internal/digraph/errors.go | 4 ++ internal/digraph/scheduler/scheduler_test.go | 50 ++++++++++--- internal/digraph/spec.go | 4 +- 5 files changed, 108 insertions(+), 31 deletions(-) diff --git a/internal/digraph/builder.go b/internal/digraph/builder.go index b24c8133d..dbee1ab47 100644 --- a/internal/digraph/builder.go +++ b/internal/digraph/builder.go @@ -64,7 +64,7 @@ var builderRegistry = []builderEntry{ {name: "infoMailConfig", fn: buildInfoMailConfig}, {name: "maxHistoryRetentionDays", fn: maxHistoryRetentionDays}, {name: "maxCleanUpTime", fn: maxCleanUpTime}, - {name: "preconditions", fn: buildPreconditions}, + {name: "preconditions", fn: buildPrecondition}, } type builderEntry struct { @@ -81,6 +81,7 @@ var stepBuilderRegistry = []stepBuilderEntry{ {name: "retryPolicy", fn: buildRetryPolicy}, {name: "repeatPolicy", fn: buildRepeatPolicy}, {name: "signalOnStop", fn: buildSignalOnStop}, + {name: "precondition", fn: buildStepPrecondition}, } type stepBuilderEntry struct { @@ -347,11 +348,57 @@ func buildHandlers(ctx BuildContext, spec *definition, dag *DAG) (err error) { return nil } -func buildPreconditions(_ BuildContext, spec *definition, dag *DAG) error { - dag.Preconditions = buildConditions(spec.Preconditions) +func buildPrecondition(ctx BuildContext, spec *definition, dag *DAG) error { + conditions, err := parsePrecondition(ctx, spec.Preconditions) + if err != nil { + return err + } + dag.Preconditions = conditions return nil } +func parsePrecondition(ctx BuildContext, precondition any) ([]Condition, error) { + switch v := precondition.(type) { + case nil: + return nil, nil + case map[any]any: + var ret Condition + for k, vv := range v { + key, ok := k.(string) + if !ok { + return nil, wrapError("preconditions", k, errPreconditionKeyMustBeString) + } + switch key { + case "condition": + ret.Condition, ok = vv.(string) + if !ok { + return nil, wrapError("preconditions", vv, errPreconditionValueMustBeString) + } + case "expected": + ret.Expected, ok = vv.(string) + if !ok { + return nil, wrapError("preconditions", vv, errPreconditionValueMustBeString) + } + default: + return nil, wrapError("preconditions", k, fmt.Errorf("%w: %s", errPreconditionHasInvalidKey, key)) + } + } + return []Condition{ret}, nil + case []any: + var ret []Condition + for _, vv := range v { + parsed, err := parsePrecondition(ctx, vv) + if err != nil { + return nil, err + } + ret = append(ret, parsed...) + } + return ret, nil + default: + return nil, wrapError("preconditions", v, errPreconditionMustBeArrayOrString) + } +} + func maxCleanUpTime(_ BuildContext, spec *definition, dag *DAG) error { if spec.MaxCleanUpTimeSec != nil { dag.MaxCleanUpTime = time.Second * time.Duration(*spec.MaxCleanUpTimeSec) @@ -439,7 +486,6 @@ func buildStep(ctx BuildContext, def stepDef, fns []*funcDef) (*Step, error) { Dir: def.Dir, Depends: def.Depends, MailOnError: def.MailOnError, - Preconditions: buildConditions(def.Preconditions), ExecutorConfig: ExecutorConfig{Config: make(map[string]any)}, } @@ -497,6 +543,15 @@ func buildRepeatPolicy(_ BuildContext, def stepDef, step *Step) error { return nil } +func buildStepPrecondition(ctx BuildContext, def stepDef, step *Step) error { + conditions, err := parsePrecondition(ctx, def.Preconditions) + if err != nil { + return err + } + step.Preconditions = conditions + return nil +} + func buildSignalOnStop(_ BuildContext, def stepDef, step *Step) error { if def.SignalOnStop != nil { sigDef := *def.SignalOnStop @@ -669,19 +724,6 @@ func parseKey(value any) (string, error) { return val, nil } -// buildConditions builds a list of conditions from the definition. -func buildConditions(cond []*conditionDef) []Condition { - var ret []Condition - for _, v := range cond { - ret = append(ret, Condition{ - Condition: v.Condition, - Expected: v.Expected, - }) - } - - return ret -} - // extractParamNames extracts a slice of parameter names by removing the '$' // from the command string. func extractParamNames(command string) []string { diff --git a/internal/digraph/condition.go b/internal/digraph/condition.go index 58f707417..27d8272ed 100644 --- a/internal/digraph/condition.go +++ b/internal/digraph/condition.go @@ -10,8 +10,9 @@ import ( // The condition can be a command substitution or an environment variable. // The expected value must be a string without any substitutions. type Condition struct { - Condition string // Condition to evaluate - Expected string // Expected value + Command string `json:"Command,omitempty"` // Command to evaluate + Condition string `json:"Condition,omitempty"` // Condition to evaluate + Expected string `json:"Expected,omitempty"` // Expected value } // eval evaluates the condition and returns the actual value. diff --git a/internal/digraph/errors.go b/internal/digraph/errors.go index 09869231e..f3a5df1f1 100644 --- a/internal/digraph/errors.go +++ b/internal/digraph/errors.go @@ -59,6 +59,10 @@ var ( errExecutorHasInvalidKey = errors.New("executor has invalid key") errExecutorConfigMustBeStringOrMap = errors.New("executor config must be string or map") errDotenvMustBeStringOrArray = errors.New("dotenv must be a string or an array of strings") + errPreconditionMustBeArrayOrString = errors.New("precondition must be a string or an array of strings") + errPreconditionKeyMustBeString = errors.New("precondition key must be a string") + errPreconditionValueMustBeString = errors.New("precondition value must be a string") + errPreconditionHasInvalidKey = errors.New("precondition has invalid key") ) // errorList is just a list of errors. diff --git a/internal/digraph/scheduler/scheduler_test.go b/internal/digraph/scheduler/scheduler_test.go index 52ea98bab..3e1621295 100644 --- a/internal/digraph/scheduler/scheduler_test.go +++ b/internal/digraph/scheduler/scheduler_test.go @@ -124,7 +124,10 @@ func TestScheduler(t *testing.T) { newStep("2", withDepends("1"), withCommand("false"), - withPrecondition("`echo 1`", "0"), + withPrecondition(digraph.Condition{ + Condition: "`echo 1`", + Expected: "0", + }), withContinueOnSkipped(), ), successStep("3", "2"), @@ -238,7 +241,12 @@ func TestScheduler(t *testing.T) { // 1 -> 2 (precondition match) -> 3 graph := sc.newGraph(t, successStep("1"), - newStep("2", withCommand("echo 2"), withPrecondition("`echo 1`", "1")), + newStep("2", withCommand("echo 2"), + withPrecondition(digraph.Condition{ + Condition: "`echo 1`", + Expected: "1", + }), + ), successStep("3", "2"), ) @@ -255,7 +263,34 @@ func TestScheduler(t *testing.T) { // 1 -> 2 (precondition not match) -> 3 graph := sc.newGraph(t, successStep("1"), - newStep("2", withCommand("echo 2"), withPrecondition("`echo 1`", "0")), + newStep("2", withCommand("echo 2"), + withPrecondition(digraph.Condition{ + Condition: "`echo 1`", + Expected: "0", + })), + successStep("3", "2"), + ) + + result := graph.Schedule(t, scheduler.StatusSuccess) + + result.AssertDoneCount(t, 1) // only 1 should + + // 1 should be executed and 2, 3 should be skipped + result.AssertNodeStatus(t, "1", scheduler.NodeStatusSuccess) + result.AssertNodeStatus(t, "2", scheduler.NodeStatusSkipped) + result.AssertNodeStatus(t, "3", scheduler.NodeStatusSkipped) + }) + t.Run("PreconditionWithCommand", func(t *testing.T) { + sc := setup(t) + + // 1 -> 2 (precondition not match) -> 3 + graph := sc.newGraph(t, + successStep("1"), + newStep("2", withCommand("echo 2"), + withPrecondition(digraph.Condition{ + Condition: "`echo 1`", + Expected: "0", + })), successStep("3", "2"), ) @@ -604,14 +639,9 @@ func withRepeatPolicy(repeat bool, interval time.Duration) stepOption { } } -func withPrecondition(condition, expected string) stepOption { +func withPrecondition(condition digraph.Condition) stepOption { return func(step *digraph.Step) { - step.Preconditions = []digraph.Condition{ - { - Condition: condition, - Expected: expected, - }, - } + step.Preconditions = []digraph.Condition{condition} } } diff --git a/internal/digraph/spec.go b/internal/digraph/spec.go index 88eeef9dd..06ee8d11a 100644 --- a/internal/digraph/spec.go +++ b/internal/digraph/spec.go @@ -44,7 +44,7 @@ type definition struct { // HistRetentionDays is the retention days of the history. HistRetentionDays *int // Preconditions is the condition to run the DAG. - Preconditions []*conditionDef + Preconditions any // MaxActiveRuns is the maximum number of concurrent steps. MaxActiveRuns int // Params is the default parameters for the steps. @@ -104,7 +104,7 @@ type stepDef struct { // MailOnError is the flag to send mail on error. MailOnError bool // Preconditions is the condition to run the step. - Preconditions []*conditionDef + Preconditions any // SignalOnStop is the signal when the step is requested to stop. // When it is empty, the same signal as the parent process is sent. // It can be KILL when the process does not stop over the timeout. From c242d071b8db18605bf44b865e6fa0dca9b78afb Mon Sep 17 00:00:00 2001 From: Yota Hamada Date: Fri, 3 Jan 2025 19:36:38 +0900 Subject: [PATCH 2/9] refactor condition --- internal/digraph/condition.go | 35 ++++++++++++++++---- internal/digraph/scheduler/scheduler_test.go | 3 +- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/internal/digraph/condition.go b/internal/digraph/condition.go index 27d8272ed..67a3037e0 100644 --- a/internal/digraph/condition.go +++ b/internal/digraph/condition.go @@ -17,26 +17,47 @@ type Condition struct { // eval evaluates the condition and returns the actual value. // It returns an error if the evaluation failed or the condition is invalid. -func (c Condition) eval(ctx context.Context) (string, error) { +func (c Condition) eval(ctx context.Context) (bool, error) { + switch { + case c.Condition != "": + return c.evalCondition(ctx) + default: + return false, fmt.Errorf("invalid condition: Condition=%s", c.Condition) + } +} + +func (c Condition) evalCondition(ctx context.Context) (bool, error) { if IsStepContext(ctx) { - return GetStepContext(ctx).EvalString(c.Condition) + evaluatedVal, err := GetStepContext(ctx).EvalString(c.Condition) + if err != nil { + return false, err + } + return c.Expected == evaluatedVal, nil } - return GetContext(ctx).EvalString(c.Condition) + evaluatedVal, err := GetContext(ctx).EvalString(c.Condition) + if err != nil { + return false, err + } + return c.Expected == evaluatedVal, nil +} + +func (c Condition) String() string { + return fmt.Sprintf("Condition=%s Expected=%s", c.Condition, c.Expected) } // evalCondition evaluates a single condition and checks the result. // It returns an error if the condition was not met. func evalCondition(ctx context.Context, c Condition) error { - actual, err := c.eval(ctx) + matched, err := c.eval(ctx) if err != nil { return fmt.Errorf("failed to evaluate condition: Condition=%s Error=%v", c.Condition, err) } - - if c.Expected != actual { - return fmt.Errorf("error condition was not met: Condition=%s Expected=%s Actual=%s", c.Condition, c.Expected, actual) + if !matched { + return fmt.Errorf("error condition was not met: Condition=%s Expected=%s", c.Condition, c.Expected) } + // Condition was met return nil } diff --git a/internal/digraph/scheduler/scheduler_test.go b/internal/digraph/scheduler/scheduler_test.go index 3e1621295..b52724670 100644 --- a/internal/digraph/scheduler/scheduler_test.go +++ b/internal/digraph/scheduler/scheduler_test.go @@ -288,8 +288,7 @@ func TestScheduler(t *testing.T) { successStep("1"), newStep("2", withCommand("echo 2"), withPrecondition(digraph.Condition{ - Condition: "`echo 1`", - Expected: "0", + Command: "true", })), successStep("3", "2"), ) From 0597c81133049c35cf72d69716643295fea39fe6 Mon Sep 17 00:00:00 2001 From: Yota Hamada Date: Fri, 3 Jan 2025 22:28:15 +0900 Subject: [PATCH 3/9] digraph: wip: command parsing --- internal/cmdutil/cmdutil.go | 56 ++++++++++++++++++++++- internal/cmdutil/cmdutil_test.go | 59 +++++++++++++++++++++++++ internal/digraph/condition.go | 12 +++++ internal/digraph/executor/command.go | 32 ++++++++++---- internal/digraph/scheduler/node_test.go | 24 +++++++++- internal/digraph/step_context.go | 2 +- 6 files changed, 172 insertions(+), 13 deletions(-) diff --git a/internal/cmdutil/cmdutil.go b/internal/cmdutil/cmdutil.go index 10f058c9c..537be0f08 100644 --- a/internal/cmdutil/cmdutil.go +++ b/internal/cmdutil/cmdutil.go @@ -112,8 +112,6 @@ func SplitCommandWithEval(cmd string) (string, []string, error) { if err != nil { return "", nil, fmt.Errorf("failed to substitute command: %w", err) } - // unescape the command - // command[i] = unescapeReplacer.Replace(command[i]) } } @@ -146,6 +144,12 @@ var ( `\r`, `\\\\r`, `\n`, `\\\\n`, ) + + unescapeReplacer = strings.NewReplacer( + `\\\\t`, `\t`, + `\\\\r`, `\r`, + `\\\\n`, `\n`, + ) ) func SplitCommand(cmd string) (string, []string, error) { @@ -240,6 +244,54 @@ func WithVariables(vars map[string]string) EvalOption { } } +// RunExitCode runs the command and returns the exit code. +// func RunExitCode(cmd string) (int, error) { +// command, args, err := SplitCommandWithEval(cmd) +// if err != nil { +// return 0, err +// } +// shellCommand := GetShellCommand("") +// } + +var regEscapedKeyValue = regexp.MustCompile(`^[^\s=]+="[^"]+"$`) + +// BuildCommandEscapedString constructs a single shell-ready string from a command and its arguments. +// It assumes that the command and arguments are already escaped. +func BuildCommandEscapedString(command string, args []string) string { + quotedArgs := make([]string, 0, len(args)) + for _, arg := range args { + // If already quoted, skip + if strings.HasPrefix(arg, `"`) && strings.HasSuffix(arg, `"`) { + quotedArgs = append(quotedArgs, arg) + continue + } + if strings.HasPrefix(arg, `'`) && strings.HasSuffix(arg, `'`) { + quotedArgs = append(quotedArgs, arg) + continue + } + // If the argument contains spaces, quote it. + if strings.ContainsAny(arg, " ") { + // If it includes '=' and is already quoted, skip + if regEscapedKeyValue.MatchString(arg) { + quotedArgs = append(quotedArgs, arg) + continue + } + // if it contains double quotes, escape them + arg = strings.ReplaceAll(arg, `"`, `\"`) + quotedArgs = append(quotedArgs, fmt.Sprintf(`"%s"`, arg)) + } else { + quotedArgs = append(quotedArgs, arg) + } + } + + // If we have no arguments, just return the command without trailing space. + if len(quotedArgs) == 0 { + return command + } + + return fmt.Sprintf("%s %s", command, strings.Join(quotedArgs, " ")) +} + // EvalString substitutes environment variables and commands in the input string func EvalString(input string, opts ...EvalOption) (string, error) { options := &EvalOptions{} diff --git a/internal/cmdutil/cmdutil_test.go b/internal/cmdutil/cmdutil_test.go index bdb41f2d7..6e8bbdedc 100644 --- a/internal/cmdutil/cmdutil_test.go +++ b/internal/cmdutil/cmdutil_test.go @@ -401,3 +401,62 @@ func TestReplaceVars(t *testing.T) { }) } } + +// TestBuildCommandString demonstrates table-driven tests for BuildCommandString. +func TestBuildEscapedCommandString(t *testing.T) { + type testCase struct { + name string + cmd string + args []string + want string + } + + tests := []testCase{ + { + name: "piping", + cmd: "echo", + args: []string{"hello", "|", "wc", "-c"}, + want: "echo hello | wc -c", + }, + { + name: "redirection", + cmd: "echo", + args: []string{"'test content'", ">", "testfile.txt", "&&", "cat", "testfile.txt"}, + want: `echo 'test content' > testfile.txt && cat testfile.txt`, + }, + { + name: `key="value" argument`, + cmd: "echo", + args: []string{`key="value"`}, + want: `echo key="value"`, + }, + { + name: "JSON argument", + cmd: "echo", + args: []string{`{"foo":"bar","hello":"world"}`}, + want: `echo {"foo":"bar","hello":"world"}`, + }, + { + name: "key=value argument", + cmd: "echo", + args: []string{`key="some value"`}, + want: `echo key="some value"`, + }, + { + name: "double quotes", + cmd: "echo", + args: []string{`a "b" c`}, + want: `echo "a \"b\" c"`, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + // Build the final command line that will be passed to `sh -c`. + cmdStr := BuildCommandEscapedString(tc.cmd, tc.args) + + // Check if the built command string is as expected. + require.Equal(t, tc.want, cmdStr, "unexpected command string") + }) + } +} diff --git a/internal/digraph/condition.go b/internal/digraph/condition.go index 67a3037e0..e0b4a93b8 100644 --- a/internal/digraph/condition.go +++ b/internal/digraph/condition.go @@ -21,11 +21,21 @@ func (c Condition) eval(ctx context.Context) (bool, error) { switch { case c.Condition != "": return c.evalCondition(ctx) + default: return false, fmt.Errorf("invalid condition: Condition=%s", c.Condition) } } +// func (c Condition) evalCommand(ctx context.Context) (bool, error) { +// command, err := GetContext(ctx).EvalString(c.Command) +// if err !=nil { +// return false, err +// } +// // Run the command and get the exit code +// exitCode, err := cmdutil.WithVariables() +// } + func (c Condition) evalCondition(ctx context.Context) (bool, error) { if IsStepContext(ctx) { evaluatedVal, err := GetStepContext(ctx).EvalString(c.Condition) @@ -39,6 +49,7 @@ func (c Condition) evalCondition(ctx context.Context) (bool, error) { if err != nil { return false, err } + return c.Expected == evaluatedVal, nil } @@ -53,6 +64,7 @@ func evalCondition(ctx context.Context, c Condition) error { if err != nil { return fmt.Errorf("failed to evaluate condition: Condition=%s Error=%v", c.Condition, err) } + if !matched { return fmt.Errorf("error condition was not met: Condition=%s Expected=%s", c.Condition, c.Expected) } diff --git a/internal/digraph/executor/command.go b/internal/digraph/executor/command.go index 9295a06bf..3db8a05da 100644 --- a/internal/digraph/executor/command.go +++ b/internal/digraph/executor/command.go @@ -13,6 +13,7 @@ import ( "github.com/dagu-org/dagu/internal/cmdutil" "github.com/dagu-org/dagu/internal/digraph" "github.com/dagu-org/dagu/internal/fileutil" + "github.com/dagu-org/dagu/internal/logger" ) type commandExecutor struct { @@ -27,7 +28,10 @@ func newCommand(ctx context.Context, step digraph.Step) (Executor, error) { stepContext := digraph.GetStepContext(ctx) - cmd := createCommand(ctx, step) + cmd, err := createCommand(ctx, step) + if err != nil { + return nil, fmt.Errorf("failed to create command: %w", err) + } cmd.Env = append(cmd.Env, stepContext.AllEnvs()...) cmd.Dir = step.Dir @@ -39,24 +43,34 @@ func newCommand(ctx context.Context, step digraph.Step) (Executor, error) { return &commandExecutor{cmd: cmd}, nil } -func createCommand(ctx context.Context, step digraph.Step) *exec.Cmd { - shellCommand := cmdutil.GetShellCommand(step.Shell) +func createCommand(ctx context.Context, step digraph.Step) (*exec.Cmd, error) { + stepContext := digraph.GetStepContext(ctx) + var args []string + for _, arg := range step.Args { + ret, err := stepContext.EvalString(arg) + if err != nil { + logger.Error(ctx, "Failed to evaluate string", "arg", arg, "err", err) + return nil, err + } + args = append(args, ret) + } + shellCommand := cmdutil.GetShellCommand(step.Shell) if shellCommand == "" { - return createDirectCommand(ctx, step) + return createDirectCommand(ctx, step, args), nil } - return createShellCommand(ctx, shellCommand, step) + return createShellCommand(ctx, shellCommand, step, args), nil } // createDirectCommand creates a command that runs directly without a shell -func createDirectCommand(ctx context.Context, step digraph.Step) *exec.Cmd { +func createDirectCommand(ctx context.Context, step digraph.Step, args []string) *exec.Cmd { // nolint: gosec - return exec.CommandContext(ctx, step.Command, step.Args...) + return exec.CommandContext(ctx, step.Command, args...) } // createShellCommand creates a command that runs through a shell -func createShellCommand(ctx context.Context, shell string, step digraph.Step) *exec.Cmd { - command := buildCommandString(step.Command, step.Args) +func createShellCommand(ctx context.Context, shell string, step digraph.Step, args []string) *exec.Cmd { + command := cmdutil.BuildCommandEscapedString(step.Command, args) return exec.CommandContext(ctx, shell, "-c", command) } diff --git a/internal/digraph/scheduler/node_test.go b/internal/digraph/scheduler/node_test.go index bfb63c484..66dc02acc 100644 --- a/internal/digraph/scheduler/node_test.go +++ b/internal/digraph/scheduler/node_test.go @@ -67,6 +67,8 @@ func withNodeOutput(output string) nodeOption { } func setupNode(t *testing.T, opts ...nodeOption) nodeHelper { + t.Helper() + th := test.Setup(t) data := scheduler.NodeData{Step: digraph.Step{}} @@ -81,6 +83,8 @@ func setupNode(t *testing.T, opts ...nodeOption) nodeHelper { } func (n nodeHelper) Execute(t *testing.T) { + t.Helper() + err := n.Node.Setup(n.Context, n.Config.Paths.LogDir, n.reqID) require.NoError(t, err, "failed to setup node") @@ -92,22 +96,28 @@ func (n nodeHelper) Execute(t *testing.T) { } func (n nodeHelper) ExecuteFail(t *testing.T, expectedErr string) { + t.Helper() + err := n.Node.Execute(n.execContext()) require.Error(t, err, "expected error") require.Contains(t, err.Error(), expectedErr, "unexpected error") } func (n nodeHelper) AssertLogContains(t *testing.T, expected string) { + t.Helper() + dat, err := os.ReadFile(n.Node.LogFilename()) require.NoErrorf(t, err, "failed to read log file %q", n.Node.LogFilename()) require.Contains(t, string(dat), expected, "log file does not contain expected string") } func (n nodeHelper) AssertOutput(t *testing.T, key, value string) { + t.Helper() + require.NotNil(t, n.Node.Data().Step.OutputVariables, "output variables not set") data, ok := n.Node.Data().Step.OutputVariables.Load(key) require.True(t, ok, "output variable not found") - require.Equal(t, fmt.Sprintf("%s=%s", key, value), data, "output variable value mismatch") + require.Equal(t, fmt.Sprintf(`%s=%s`, key, value), data, "output variable value mismatch") } func (n nodeHelper) execContext() context.Context { @@ -226,6 +236,18 @@ func TestNode(t *testing.T) { CmdWithArgs: `echo "{\"key\":\"value\"}"`, Want: `{"key":"value"}`, }, + { + CmdWithArgs: `echo 'hello world'`, + Want: `hello world`, + }, + { + CmdWithArgs: `echo hello "world"`, + Want: `hello world`, + }, + { + CmdWithArgs: `echo 'hello "world"'`, + Want: `hello "world"`, + }, } for i, tc := range testCases { diff --git a/internal/digraph/step_context.go b/internal/digraph/step_context.go index 323360165..ecf59af11 100644 --- a/internal/digraph/step_context.go +++ b/internal/digraph/step_context.go @@ -61,7 +61,7 @@ func (c StepContext) MailerConfig() (mailer.Config, error) { }) } -func (c StepContext) EvalString(s string) (string, error) { +func (c StepContext) EvalString(s string, opts ...cmdutil.EvalOption) (string, error) { return cmdutil.EvalString(s, cmdutil.WithVariables(c.envs), cmdutil.WithVariables(c.outputVariables.Variables()), From 17a7bca6eea52d3d84edcdd969b70472a53ea356 Mon Sep 17 00:00:00 2001 From: Yota Hamada Date: Fri, 3 Jan 2025 22:43:20 +0900 Subject: [PATCH 4/9] digraph: wip: command parsing --- internal/cmdutil/cmdutil.go | 8 +------- internal/digraph/scheduler/node.go | 8 +++++++- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/internal/cmdutil/cmdutil.go b/internal/cmdutil/cmdutil.go index 537be0f08..a25d2528f 100644 --- a/internal/cmdutil/cmdutil.go +++ b/internal/cmdutil/cmdutil.go @@ -105,7 +105,7 @@ func SplitCommandWithEval(cmd string) (string, []string, error) { } for i, arg := range command { command[i] = arg - // escape the command + // Escape the command command[i] = escapeReplacer.Replace(command[i]) // Substitute command in the command. command[i], err = SubstituteCommands(command[i]) @@ -144,12 +144,6 @@ var ( `\r`, `\\\\r`, `\n`, `\\\\n`, ) - - unescapeReplacer = strings.NewReplacer( - `\\\\t`, `\t`, - `\\\\r`, `\r`, - `\\\\n`, `\n`, - ) ) func SplitCommand(cmd string) (string, []string, error) { diff --git a/internal/digraph/scheduler/node.go b/internal/digraph/scheduler/node.go index ee1cd4cf7..28305f6ad 100644 --- a/internal/digraph/scheduler/node.go +++ b/internal/digraph/scheduler/node.go @@ -196,7 +196,13 @@ func (n *Node) SetupExec(ctx context.Context) (executor.Executor, error) { n.cancelFunc = fn if n.data.Step.CmdWithArgs != "" { - cmd, args, err := cmdutil.SplitCommandWithEval(n.data.Step.CmdWithArgs) + // Expand envs + stepContext := digraph.GetStepContext(ctx) + cmdWithArgs, err := stepContext.EvalString(n.data.Step.CmdWithArgs) + if err != nil { + return nil, err + } + cmd, args, err := cmdutil.SplitCommandWithEval(cmdWithArgs) if err != nil { return nil, fmt.Errorf("failed to split command: %w", err) } From ac1ca2a69a5ae7cb35ab14a5981f812b5e1e2d0d Mon Sep 17 00:00:00 2001 From: Yota Hamada Date: Fri, 3 Jan 2025 22:47:14 +0900 Subject: [PATCH 5/9] fix test --- internal/digraph/condition.go | 6 ++++-- internal/digraph/condition_test.go | 12 +++--------- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/internal/digraph/condition.go b/internal/digraph/condition.go index e0b4a93b8..87420a325 100644 --- a/internal/digraph/condition.go +++ b/internal/digraph/condition.go @@ -5,6 +5,8 @@ import ( "fmt" ) +var ErrConditionNotMet = fmt.Errorf("condition not met") + // Condition contains a condition and the expected value. // Conditions are evaluated and compared to the expected value. // The condition can be a command substitution or an environment variable. @@ -29,7 +31,7 @@ func (c Condition) eval(ctx context.Context) (bool, error) { // func (c Condition) evalCommand(ctx context.Context) (bool, error) { // command, err := GetContext(ctx).EvalString(c.Command) -// if err !=nil { +// if err != nil { // return false, err // } // // Run the command and get the exit code @@ -66,7 +68,7 @@ func evalCondition(ctx context.Context, c Condition) error { } if !matched { - return fmt.Errorf("error condition was not met: Condition=%s Expected=%s", c.Condition, c.Expected) + return fmt.Errorf("%w: Condition=%s Expected=%s", ErrConditionNotMet, c.Condition, c.Expected) } // Condition was met diff --git a/internal/digraph/condition_test.go b/internal/digraph/condition_test.go index e38bd1b1b..ace816133 100644 --- a/internal/digraph/condition_test.go +++ b/internal/digraph/condition_test.go @@ -49,15 +49,6 @@ func TestCondition_Eval(t *testing.T) { }, wantErr: true, }, - { - name: "InvalidCond", - condition: []Condition{ - { - Condition: "`invalid`", - }, - }, - wantErr: true, - }, } // Set environment variable for testing @@ -70,6 +61,9 @@ func TestCondition_Eval(t *testing.T) { t.Run(tt.name, func(t *testing.T) { err := EvalConditions(context.Background(), tt.condition) require.Equal(t, tt.wantErr, err != nil) + if err != nil { + require.ErrorIs(t, err, ErrConditionNotMet) + } }) } } From d048bb0af22c8350188ebc58f6f88ac3f5e32304 Mon Sep 17 00:00:00 2001 From: Yota Hamada Date: Fri, 3 Jan 2025 23:16:26 +0900 Subject: [PATCH 6/9] digraph: wip: command parsing --- internal/digraph/condition.go | 61 +++++++++++++++++--- internal/digraph/condition_test.go | 25 ++++++++ internal/digraph/scheduler/scheduler_test.go | 23 +++++++- 3 files changed, 99 insertions(+), 10 deletions(-) diff --git a/internal/digraph/condition.go b/internal/digraph/condition.go index 87420a325..e44a8048e 100644 --- a/internal/digraph/condition.go +++ b/internal/digraph/condition.go @@ -2,10 +2,14 @@ package digraph import ( "context" + "errors" "fmt" + "os/exec" + + "github.com/dagu-org/dagu/internal/cmdutil" ) -var ErrConditionNotMet = fmt.Errorf("condition not met") +var ErrConditionNotMet = fmt.Errorf("condition was not met") // Condition contains a condition and the expected value. // Conditions are evaluated and compared to the expected value. @@ -24,19 +28,55 @@ func (c Condition) eval(ctx context.Context) (bool, error) { case c.Condition != "": return c.evalCondition(ctx) + case c.Command != "": + return c.evalCommand(ctx) + default: return false, fmt.Errorf("invalid condition: Condition=%s", c.Condition) } } -// func (c Condition) evalCommand(ctx context.Context) (bool, error) { -// command, err := GetContext(ctx).EvalString(c.Command) -// if err != nil { -// return false, err -// } -// // Run the command and get the exit code -// exitCode, err := cmdutil.WithVariables() -// } +func (c Condition) evalCommand(ctx context.Context) (bool, error) { + var commandToRun string + if IsStepContext(ctx) { + command, err := GetStepContext(ctx).EvalString(c.Command) + if err != nil { + return false, err + } + commandToRun = command + } else if IsContext(ctx) { + command, err := GetContext(ctx).EvalString(c.Command) + if err != nil { + return false, err + } + commandToRun = command + } else { + command, err := cmdutil.EvalString(c.Command) + if err != nil { + return false, err + } + commandToRun = command + } + + shell := cmdutil.GetShellCommand("") + if shell == "" { + // Run the command directly + cmd := exec.CommandContext(ctx, commandToRun) + _, err := cmd.Output() + if err != nil { + return false, fmt.Errorf("%w: %s", ErrConditionNotMet, err) + } + return true, nil + } + + // Run the command through a shell + cmd := exec.CommandContext(ctx, shell, "-c", commandToRun) + _, err := cmd.Output() + if err != nil { + return false, fmt.Errorf("%w: %s", ErrConditionNotMet, err) + } + return true, nil +} func (c Condition) evalCondition(ctx context.Context) (bool, error) { if IsStepContext(ctx) { @@ -64,6 +104,9 @@ func (c Condition) String() string { func evalCondition(ctx context.Context, c Condition) error { matched, err := c.eval(ctx) if err != nil { + if errors.Is(err, ErrConditionNotMet) { + return err + } return fmt.Errorf("failed to evaluate condition: Condition=%s Error=%v", c.Condition, err) } diff --git a/internal/digraph/condition_test.go b/internal/digraph/condition_test.go index ace816133..af38a0c7e 100644 --- a/internal/digraph/condition_test.go +++ b/internal/digraph/condition_test.go @@ -49,6 +49,31 @@ func TestCondition_Eval(t *testing.T) { }, wantErr: true, }, + { + name: "CommandResultMet", + condition: []Condition{ + { + Command: "true", + }, + }, + }, + { + name: "CommandResultNotMet", + condition: []Condition{ + { + Command: "false", + }, + }, + wantErr: true, + }, + { + name: "CommandResultTest", + condition: []Condition{ + { + Command: "test 1 -eq 1", + }, + }, + }, } // Set environment variable for testing diff --git a/internal/digraph/scheduler/scheduler_test.go b/internal/digraph/scheduler/scheduler_test.go index b52724670..0d118406a 100644 --- a/internal/digraph/scheduler/scheduler_test.go +++ b/internal/digraph/scheduler/scheduler_test.go @@ -280,7 +280,7 @@ func TestScheduler(t *testing.T) { result.AssertNodeStatus(t, "2", scheduler.NodeStatusSkipped) result.AssertNodeStatus(t, "3", scheduler.NodeStatusSkipped) }) - t.Run("PreconditionWithCommand", func(t *testing.T) { + t.Run("PreconditionWithCommandMet", func(t *testing.T) { sc := setup(t) // 1 -> 2 (precondition not match) -> 3 @@ -295,6 +295,27 @@ func TestScheduler(t *testing.T) { result := graph.Schedule(t, scheduler.StatusSuccess) + result.AssertDoneCount(t, 3) + + result.AssertNodeStatus(t, "1", scheduler.NodeStatusSuccess) + result.AssertNodeStatus(t, "2", scheduler.NodeStatusSuccess) + result.AssertNodeStatus(t, "3", scheduler.NodeStatusSuccess) + }) + t.Run("PreconditionWithCommandNotMet", func(t *testing.T) { + sc := setup(t) + + // 1 -> 2 (precondition not match) -> 3 + graph := sc.newGraph(t, + successStep("1"), + newStep("2", withCommand("echo 2"), + withPrecondition(digraph.Condition{ + Command: "false", + })), + successStep("3", "2"), + ) + + result := graph.Schedule(t, scheduler.StatusSuccess) + result.AssertDoneCount(t, 1) // only 1 should // 1 should be executed and 2, 3 should be skipped From a5dbfb05bbdad226278095ce86b8d27a32ec58ab Mon Sep 17 00:00:00 2001 From: Yota Hamada Date: Fri, 3 Jan 2025 23:33:01 +0900 Subject: [PATCH 7/9] dag: parsing precondition settings --- internal/digraph/builder.go | 38 ++++++++++++++++++++++++++++++++++- internal/digraph/condition.go | 16 +++++++++++++++ internal/digraph/spec.go | 10 ++++----- 3 files changed, 57 insertions(+), 7 deletions(-) diff --git a/internal/digraph/builder.go b/internal/digraph/builder.go index dbee1ab47..5b539d2bf 100644 --- a/internal/digraph/builder.go +++ b/internal/digraph/builder.go @@ -349,11 +349,19 @@ func buildHandlers(ctx BuildContext, spec *definition, dag *DAG) (err error) { } func buildPrecondition(ctx BuildContext, spec *definition, dag *DAG) error { + // Parse both `preconditions` and `precondition` fields. conditions, err := parsePrecondition(ctx, spec.Preconditions) if err != nil { return err } + condition, err := parsePrecondition(ctx, spec.Precondition) + if err != nil { + return err + } + dag.Preconditions = conditions + dag.Preconditions = append(dag.Preconditions, condition...) + return nil } @@ -361,6 +369,10 @@ func parsePrecondition(ctx BuildContext, precondition any) ([]Condition, error) switch v := precondition.(type) { case nil: return nil, nil + + case string: + return []Condition{{Command: v}}, nil + case map[any]any: var ret Condition for k, vv := range v { @@ -368,22 +380,38 @@ func parsePrecondition(ctx BuildContext, precondition any) ([]Condition, error) if !ok { return nil, wrapError("preconditions", k, errPreconditionKeyMustBeString) } - switch key { + + switch strings.ToLower(key) { case "condition": ret.Condition, ok = vv.(string) if !ok { return nil, wrapError("preconditions", vv, errPreconditionValueMustBeString) } + case "expected": ret.Expected, ok = vv.(string) if !ok { return nil, wrapError("preconditions", vv, errPreconditionValueMustBeString) } + + case "command": + ret.Command, ok = vv.(string) + if !ok { + return nil, wrapError("preconditions", vv, errPreconditionValueMustBeString) + } + default: return nil, wrapError("preconditions", k, fmt.Errorf("%w: %s", errPreconditionHasInvalidKey, key)) + } } + + if err := ret.Validate(); err != nil { + return nil, wrapError("preconditions", v, err) + } + return []Condition{ret}, nil + case []any: var ret []Condition for _, vv := range v { @@ -394,8 +422,10 @@ func parsePrecondition(ctx BuildContext, precondition any) ([]Condition, error) ret = append(ret, parsed...) } return ret, nil + default: return nil, wrapError("preconditions", v, errPreconditionMustBeArrayOrString) + } } @@ -544,11 +574,17 @@ func buildRepeatPolicy(_ BuildContext, def stepDef, step *Step) error { } func buildStepPrecondition(ctx BuildContext, def stepDef, step *Step) error { + // Parse both `preconditions` and `precondition` fields. conditions, err := parsePrecondition(ctx, def.Preconditions) if err != nil { return err } + condition, err := parsePrecondition(ctx, def.Precondition) + if err != nil { + return err + } step.Preconditions = conditions + step.Preconditions = append(step.Preconditions, condition...) return nil } diff --git a/internal/digraph/condition.go b/internal/digraph/condition.go index e44a8048e..e08838748 100644 --- a/internal/digraph/condition.go +++ b/internal/digraph/condition.go @@ -21,6 +21,22 @@ type Condition struct { Expected string `json:"Expected,omitempty"` // Expected value } +func (c Condition) Validate() error { + switch { + case c.Condition != "": + if c.Expected == "" { + return fmt.Errorf("expected value is required for condition: Condition=%s", c.Condition) + } + + case c.Command != "": + // Command is required + default: + return fmt.Errorf("invalid condition: Condition=%s", c.Condition) + } + + return nil +} + // eval evaluates the condition and returns the actual value. // It returns an error if the evaluation failed or the condition is invalid. func (c Condition) eval(ctx context.Context) (bool, error) { diff --git a/internal/digraph/spec.go b/internal/digraph/spec.go index 06ee8d11a..98c187cb7 100644 --- a/internal/digraph/spec.go +++ b/internal/digraph/spec.go @@ -43,6 +43,8 @@ type definition struct { RestartWaitSec int // HistRetentionDays is the retention days of the history. HistRetentionDays *int + // Precondition is the condition to run the DAG. + Precondition any // Preconditions is the condition to run the DAG. Preconditions any // MaxActiveRuns is the maximum number of concurrent steps. @@ -57,12 +59,6 @@ type definition struct { Tags any } -// conditionDef defines a condition and its expected value. -type conditionDef struct { - Condition string // Condition to evaluate - Expected string // Expected value -} - // handlerOnDef defines the steps to be executed on different events. type handlerOnDef struct { Failure *stepDef // Step to execute on failure @@ -103,6 +99,8 @@ type stepDef struct { RepeatPolicy *repeatPolicyDef // MailOnError is the flag to send mail on error. MailOnError bool + // Precondition is the condition to run the step. + Precondition any // Preconditions is the condition to run the step. Preconditions any // SignalOnStop is the signal when the step is requested to stop. From 78b067ebc2c10186c4498c878134cedebff6cb5c Mon Sep 17 00:00:00 2001 From: Yota Hamada Date: Fri, 3 Jan 2025 23:44:44 +0900 Subject: [PATCH 8/9] docs: update precondition description --- docs/source/schema.rst | 48 +++++++++++++++++++++++++++++-------- docs/source/yaml_format.rst | 38 ++++++++++++++++++++++++++--- 2 files changed, 73 insertions(+), 13 deletions(-) diff --git a/docs/source/schema.rst b/docs/source/schema.rst index c15852243..c6f1a4394 100644 --- a/docs/source/schema.rst +++ b/docs/source/schema.rst @@ -152,15 +152,33 @@ These fields apply to the entire DAG. They appear at the root of the YAML file. - FOO: 1 - BAR: "`echo 2`" -``preconditions`` +``precondition`` ~~~~~~~~~~~~~~~ - A list of conditions that must be satisfied before the DAG can run. Each condition can use shell expansions or command substitutions to validate external states. + The condition(s) that must be satisfied before the DAG can run. Each condition can use shell expansions or command substitutions to validate external states. - **Example**: + **Example**: Condition based on command exit code: + + .. code-block:: yaml + + precondition: + - "test -f /path/to/file" + + # or more simply + precondition: "test -f /path/to/file" + + **Example**: Condition based on environment variables: .. code-block:: yaml - preconditions: + precondition: + - condition: "$ENV_VAR" + expected: "value" + + **Example**: Condition based on command output (stdout): + + .. code-block:: yaml + + precondition: - condition: "`echo $2`" expected: "param2" @@ -285,21 +303,31 @@ Each element in the top-level ``steps`` list has its own fields for customizatio repeat: true intervalSec: 60 # run every minute -``preconditions`` +``precondition`` ~~~~~~~~~~~~~~ - Conditions that must be met for this step to run. Each condition block has: - - - **condition** (string): A command or expression to evaluate. - - **expected** (string): The expected output. If the output matches, the step runs; otherwise, it is skipped. + Condition(s) that must be met for this step to run. It works same as the DAG-level ``precondition`` field. See :ref:`DAG-Level Fields ` for examples. .. code-block:: yaml steps: + # Example 1: based on exit code + - name: daily task + command: daily.sh + precondition: "test -f /path/to/file" + + # Example 2: based on command output (stdout) - name: monthly task command: monthly.sh - preconditions: + precondition: - condition: "`date '+%d'`" expected: "01" + + # Example 3: based on environment variables + - name: weekly task + command: weekly.sh + precondition: + - condition: "$WEEKDAY" + expected: "Friday" ``depends`` ~~~~~~~~~ diff --git a/docs/source/yaml_format.rst b/docs/source/yaml_format.rst index 482e73dd5..bb631b1f0 100644 --- a/docs/source/yaml_format.rst +++ b/docs/source/yaml_format.rst @@ -201,10 +201,42 @@ Send output to files: Conditional Execution ------------------ -Preconditions +Precondition ~~~~~~~~~~~~ Run steps only when conditions are met: +.. code-block:: yaml + + steps: + - name: monthly task + command: monthly.sh + preconditions: "test -f file.txt" # Run only if the file exists + +Use multiple conditions: + +.. code-block:: yaml + + steps: + - name: monthly task + command: monthly.sh + preconditions: # Run only if all commands exit with 0 + - "test -f file.txt" + - "test -d dir" + +Use environment variables in conditions: + +.. code-block:: yaml + + steps: + - name: monthly task + command: monthly.sh + preconditions: + - condition: "${TODAY}" # Run only if TODAY is set as "01" + expected: "01" + + +Use command substitution in conditions: + .. code-block:: yaml steps: @@ -431,7 +463,7 @@ Complete list of DAG-level configuration options: - ``delaySec``: Delay between steps - ``maxActiveRuns``: Maximum parallel steps - ``params``: Default parameters -- ``preconditions``: DAG-level conditions +- ``precondition``: DAG-level conditions - ``mailOn``: Email notification settings - ``MaxCleanUpTimeSec``: Cleanup timeout - ``handlerOn``: Lifecycle event handlers @@ -456,7 +488,7 @@ Example DAG configuration: delaySec: 1 maxActiveRuns: 1 params: param1 param2 - preconditions: + precondition: - condition: "`echo $2`" expected: "param2" mailOn: From 58efc58cc000307598d65e41a1267800977e6b2c Mon Sep 17 00:00:00 2001 From: Yota Hamada Date: Fri, 3 Jan 2025 23:50:44 +0900 Subject: [PATCH 9/9] fix issue detected by linter --- internal/digraph/executor/command.go | 6 ------ internal/digraph/step_context.go | 2 +- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/internal/digraph/executor/command.go b/internal/digraph/executor/command.go index 3db8a05da..25b7ee721 100644 --- a/internal/digraph/executor/command.go +++ b/internal/digraph/executor/command.go @@ -6,7 +6,6 @@ import ( "io" "os" "os/exec" - "strings" "sync" "syscall" @@ -74,11 +73,6 @@ func createShellCommand(ctx context.Context, shell string, step digraph.Step, ar return exec.CommandContext(ctx, shell, "-c", command) } -// buildCommandString combines the command and arguments into a single string -func buildCommandString(command string, args []string) string { - return fmt.Sprintf("%s %s", command, strings.Join(args, " ")) -} - func (e *commandExecutor) Run(_ context.Context) error { e.lock.Lock() err := e.cmd.Start() diff --git a/internal/digraph/step_context.go b/internal/digraph/step_context.go index ecf59af11..323360165 100644 --- a/internal/digraph/step_context.go +++ b/internal/digraph/step_context.go @@ -61,7 +61,7 @@ func (c StepContext) MailerConfig() (mailer.Config, error) { }) } -func (c StepContext) EvalString(s string, opts ...cmdutil.EvalOption) (string, error) { +func (c StepContext) EvalString(s string) (string, error) { return cmdutil.EvalString(s, cmdutil.WithVariables(c.envs), cmdutil.WithVariables(c.outputVariables.Variables()),