diff --git a/cmd/dry.go b/cmd/dry.go index 2303c8ad0..9aab330d9 100644 --- a/cmd/dry.go +++ b/cmd/dry.go @@ -97,5 +97,7 @@ func runDry(cmd *cobra.Command, args []string) error { return fmt.Errorf("failed to execute DAG %s (requestID: %s): %w", dag.Name, requestID, err) } + agt.PrintSummary(ctx) + return nil } diff --git a/cmd/retry.go b/cmd/retry.go index b76bc195c..37e9ba4f9 100644 --- a/cmd/retry.go +++ b/cmd/retry.go @@ -144,7 +144,10 @@ func executeRetry(ctx context.Context, dag *digraph.DAG, setup *setup, originalS agt.PrintSummary(ctx) return fmt.Errorf("failed to execute DAG %s (requestID: %s): %w", dag.Name, newRequestID, err) } + } + if !quiet { + agt.PrintSummary(ctx) } return nil diff --git a/cmd/start.go b/cmd/start.go index d995ce3ce..9609fd948 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -137,6 +137,10 @@ func executeDag(ctx context.Context, setup *setup, specPath string, loadOpts []d } } + if !quiet { + agt.PrintSummary(ctx) + } + return nil } diff --git a/docs/source/yaml_format.rst b/docs/source/yaml_format.rst index 48569a4de..4e51656b4 100644 --- a/docs/source/yaml_format.rst +++ b/docs/source/yaml_format.rst @@ -542,25 +542,6 @@ Execute steps periodically: repeat: true intervalSec: 60 -User Defined Functions -~~~~~~~~~~~~~~~~~~~ -Create reusable task templates: - -.. code-block:: yaml - - functions: - - name: my_function - params: param1 param2 - command: python main.py $param1 $param2 - - steps: - - name: use function - call: - function: my_function - args: - param1: 1 - param2: 2 - Field Reference ------------- diff --git a/internal/agent/agent_test.go b/internal/agent/agent_test.go index 6bc7558f0..12332f797 100644 --- a/internal/agent/agent_test.go +++ b/internal/agent/agent_test.go @@ -168,7 +168,7 @@ func TestAgent_Retry(t *testing.T) { // Modify the DAG to make it successful status := dagAgent.Status() for i := range status.Nodes { - status.Nodes[i].Step.CmdWithArgs = "true" + status.Nodes[i].Step.CmdArgsSys = "true" } // Retry the DAG and check if it is successful diff --git a/internal/cmdutil/cmd.go b/internal/cmdutil/cmd.go index 25492bd0c..51985c525 100644 --- a/internal/cmdutil/cmd.go +++ b/internal/cmdutil/cmd.go @@ -8,6 +8,25 @@ import ( "unicode" ) +// ArgsDelimiter is the delimiter used to separate command arguments +const ArgsDelimiter = "∯ᓰ♨" + +// JoinCommandArgs joins a command and its arguments into a single string +// separated by ArgsDelimiter +func JoinCommandArgs(cmd string, args []string) string { + return fmt.Sprintf("%s %s", cmd, strings.Join(args, ArgsDelimiter)) +} + +// SplitCommandArgs splits a command and its arguments into a command and a slice of arguments +func SplitCommandArgs(cmdWithArgs string) (string, []string) { + parts := strings.SplitN(cmdWithArgs, " ", 2) + if len(parts) == 1 { + return parts[0], nil + } + command, args := parts[0], parts[1] + return command, strings.Split(args, ArgsDelimiter) +} + // GetShellCommand returns the shell to use for command execution func GetShellCommand(configuredShell string) string { if configuredShell != "" { diff --git a/internal/cmdutil/cmd_test.go b/internal/cmdutil/cmd_test.go index dda4ee16d..8088b6cbc 100644 --- a/internal/cmdutil/cmd_test.go +++ b/internal/cmdutil/cmd_test.go @@ -102,6 +102,12 @@ func TestSplitCommand(t *testing.T) { wantCmd: "echo", wantArgs: []string{`"\"hello world\""`}, }, + { + name: "command with JSON", + input: `echo "{\n\t\"key\": \"value\"\n}"`, + wantCmd: "echo", + wantArgs: []string{`"{\n\t\"key\": \"value\"\n}"`}, + }, } for _, tt := range tests { diff --git a/internal/digraph/builder.go b/internal/digraph/builder.go index a556b1192..6a7dba1c2 100644 --- a/internal/digraph/builder.go +++ b/internal/digraph/builder.go @@ -76,9 +76,9 @@ type builderEntry struct { } var stepBuilderRegistry = []stepBuilderEntry{ + {name: "executor", fn: buildExecutor}, {name: "command", fn: buildCommand}, {name: "depends", fn: buildDepends}, - {name: "executor", fn: buildExecutor}, {name: "subworkflow", fn: buildSubWorkflow}, {name: "continueOn", fn: buildContinueOn}, {name: "retryPolicy", fn: buildRetryPolicy}, diff --git a/internal/digraph/builder_test.go b/internal/digraph/builder_test.go index 184c30dc8..dd8e10a51 100644 --- a/internal/digraph/builder_test.go +++ b/internal/digraph/builder_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/dagu-org/dagu/internal/cmdutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -288,7 +289,9 @@ func TestBuildStep(t *testing.T) { t.Run("ValidCommandInArray", func(t *testing.T) { th := loadTestYAML(t, "valid_command_in_array.yaml") assert.Len(t, th.Steps, 1) - assert.Equal(t, `echo "1"`, th.Steps[0].CmdWithArgs) + assert.Equal(t, + cmdutil.JoinCommandArgs("echo", []string{"1"}), + th.Steps[0].CmdArgsSys) assert.Equal(t, "echo", th.Steps[0].Command) assert.Equal(t, []string{"1"}, th.Steps[0].Args) assert.Equal(t, "step 1", th.Steps[0].Name) @@ -296,7 +299,9 @@ func TestBuildStep(t *testing.T) { t.Run("ValidCommandInList", func(t *testing.T) { th := loadTestYAML(t, "valid_command_in_list.yaml") assert.Len(t, th.Steps, 1) - assert.Equal(t, `echo "1"`, th.Steps[0].CmdWithArgs) + assert.Equal(t, + cmdutil.JoinCommandArgs("echo", []string{"1"}), + th.Steps[0].CmdArgsSys) assert.Equal(t, "echo", th.Steps[0].Command) assert.Equal(t, []string{"1"}, th.Steps[0].Args) assert.Equal(t, "step 1", th.Steps[0].Name) diff --git a/internal/digraph/command.go b/internal/digraph/command.go index a2fd41475..d89cddeea 100644 --- a/internal/digraph/command.go +++ b/internal/digraph/command.go @@ -55,6 +55,9 @@ func buildCommand(_ BuildContext, def stepDef, step *Step) error { case []any: // Case 3: command is an array + + var command string + var args []string for _, v := range val { val, ok := v.(string) if !ok { @@ -62,11 +65,11 @@ func buildCommand(_ BuildContext, def stepDef, step *Step) error { // This is useful when the value is an integer for example. val = fmt.Sprintf("%v", v) } - if step.Command == "" { - step.Command = val + if command == "" { + command = val continue } - step.Args = append(step.Args, val) + args = append(args, val) } // Setup CmdWithArgs (this will be actually used in the command execution) @@ -77,7 +80,11 @@ func buildCommand(_ BuildContext, def stepDef, step *Step) error { } sb.WriteString(fmt.Sprintf("%q", arg)) } + + step.Command = command + step.Args = args step.CmdWithArgs = fmt.Sprintf("%s %s", step.Command, sb.String()) + step.CmdArgsSys = cmdutil.JoinCommandArgs(step.Command, step.Args) default: // Unknown type for command field. diff --git a/internal/digraph/step_context.go b/internal/digraph/context_step.go similarity index 99% rename from internal/digraph/step_context.go rename to internal/digraph/context_step.go index 647bf911b..9cb17556e 100644 --- a/internal/digraph/step_context.go +++ b/internal/digraph/context_step.go @@ -11,7 +11,6 @@ import ( type StepContext struct { Context - outputVariables *SyncMap step Step envs map[string]string diff --git a/internal/digraph/executor/command.go b/internal/digraph/executor/command.go index 66c84c058..da3aa680e 100644 --- a/internal/digraph/executor/command.go +++ b/internal/digraph/executor/command.go @@ -12,7 +12,6 @@ import ( "github.com/dagu-org/dagu/internal/cmdutil" "github.com/dagu-org/dagu/internal/digraph" "github.com/dagu-org/dagu/internal/fileutil" - "github.com/dagu-org/dagu/internal/logger" ) var _ Executor = (*commandExecutor)(nil) @@ -102,22 +101,12 @@ func newCommand(ctx context.Context, step digraph.Step) (Executor, error) { } func createCommand(ctx context.Context, step digraph.Step) (*exec.Cmd, error) { - stepContext := digraph.GetStepContext(ctx) - var args []string - for _, arg := range step.Args { - ret, err := stepContext.EvalString(arg, cmdutil.OnlyReplaceVars()) - if err != nil { - logger.Error(ctx, "Failed to evaluate string", "arg", arg, "err", err) - return nil, err - } - args = append(args, ret) - } - shellCommand := cmdutil.GetShellCommand(step.Shell) - if shellCommand == "" { - return createDirectCommand(ctx, step, args), nil + shellCmdArgs := step.ShellCmdArgs + if shellCommand == "" || shellCmdArgs == "" { + return createDirectCommand(ctx, step, step.Args), nil } - return createShellCommand(ctx, shellCommand, step, args), nil + return createShellCommand(ctx, shellCommand, shellCmdArgs), nil } // createDirectCommand creates a command that runs directly without a shell @@ -127,7 +116,6 @@ func createDirectCommand(ctx context.Context, step digraph.Step, args []string) } // createShellCommand creates a command that runs through a shell -func createShellCommand(ctx context.Context, shell string, step digraph.Step, args []string) *exec.Cmd { - command := cmdutil.BuildCommandEscapedString(step.Command, args) - return exec.CommandContext(ctx, shell, "-c", command) +func createShellCommand(ctx context.Context, shell, shellCmd string) *exec.Cmd { + return exec.CommandContext(ctx, shell, "-c", shellCmd) } diff --git a/internal/digraph/scheduler/node.go b/internal/digraph/scheduler/node.go index 86c760cdb..c153aeb32 100644 --- a/internal/digraph/scheduler/node.go +++ b/internal/digraph/scheduler/node.go @@ -43,6 +43,7 @@ type Node struct { scriptFile *os.File done bool retryPolicy retryPolicy + cmdEvaluated bool } type NodeData struct { @@ -228,7 +229,7 @@ func (n *Node) State() NodeState { // Execute runs the command synchronously and returns error if any. func (n *Node) Execute(ctx context.Context) error { - cmd, err := n.SetupExec(ctx) + cmd, err := n.setupExec(ctx) if err != nil { return err } @@ -309,7 +310,7 @@ func (n *Node) Finish() { n.data.State.FinishedAt = time.Now() } -func (n *Node) SetupExec(ctx context.Context) (executor.Executor, error) { +func (n *Node) setupExec(ctx context.Context) (executor.Executor, error) { n.mu.Lock() defer n.mu.Unlock() @@ -324,24 +325,9 @@ func (n *Node) SetupExec(ctx context.Context) (executor.Executor, error) { n.data.State.Error = nil n.data.State.ExitCode = 0 - if n.data.Step.CmdWithArgs != "" { - // Expand envs - stepContext := digraph.GetStepContext(ctx) - cmdWithArgs, err := stepContext.EvalString(n.data.Step.CmdWithArgs, cmdutil.WithoutExpandEnv()) - if err != nil { - return nil, err - } - cmd, args, err := cmdutil.SplitCommandWithSub(cmdWithArgs) - if err != nil { - return nil, fmt.Errorf("failed to split command: %w", err) - } - n.data.Step.Command = cmd - n.data.Step.Args = args - } - - if n.data.Step.Command == "" { - // If the command is empty, use the default shell as the command - n.data.Step.Command = cmdutil.GetShellCommand(n.data.Step.Shell) + // Evaluate the command and args if not already evaluated + if err := n.evaluateCommandArgs(ctx); err != nil { + return nil, err } if n.scriptFile != nil { @@ -385,6 +371,101 @@ func (n *Node) SetupExec(ctx context.Context) (executor.Executor, error) { return cmd, nil } +func (n *Node) evaluateCommandArgs(ctx context.Context) error { + if n.cmdEvaluated { + return nil + } + + stepContext := digraph.GetStepContext(ctx) + switch { + case n.data.Step.CmdArgsSys != "": + // In case of the command and args are defined as a list. In this case, + // CmdArgsSys is a string with the command and args separated by special markers. + cmd, args := cmdutil.SplitCommandArgs(n.data.Step.CmdArgsSys) + for i, arg := range args { + value, err := stepContext.EvalString(arg, cmdutil.WithoutExpandEnv()) + if err != nil { + return fmt.Errorf("failed to eval command with args: %w", err) + } + args[i] = value + } + n.data.Step.Command = cmd + n.data.Step.Args = args + + if n.data.Step.ExecutorConfig.IsCommand() { + n.data.Step.ShellCmdArgs = cmdutil.BuildCommandEscapedString(cmd, args) + } + + case n.data.Step.CmdWithArgs != "": + // In case of the command and args are defined as a string. + stepContext := digraph.GetStepContext(ctx) + cmdWithArgs, err := stepContext.EvalString(n.data.Step.CmdWithArgs, cmdutil.WithoutExpandEnv()) + if err != nil { + return err + } + + // Use user defined command as the shell command args that should be already a valid command. + if n.data.Step.ExecutorConfig.IsCommand() { + n.data.Step.ShellCmdArgs = cmdWithArgs + } + + // Split the command and args in case shell is not available in the system. + // In this case, the command and args need to be split to run the command directly. + cmd, args, err := cmdutil.SplitCommand(cmdWithArgs) + if err != nil { + return fmt.Errorf("failed to split command with args: %w", err) + } + + n.data.Step.Command = cmd + n.data.Step.Args = args + + case n.data.Step.Command == "": + // If the command is empty, use the default shell as the command + n.data.Step.Command = cmdutil.GetShellCommand(n.data.Step.Shell) + + case n.data.Step.Command != "" && len(n.data.Step.Args) == 0: + // Shouldn't reach here except for testing. + + cmd, args, err := cmdutil.SplitCommand(n.data.Step.Command) + if err != nil { + return fmt.Errorf("failed to split command: %w", err) + } + for i, arg := range args { + value, err := stepContext.EvalString(arg, cmdutil.WithoutExpandEnv()) + if err != nil { + return fmt.Errorf("failed to eval command args: %w", err) + } + args[i] = value + } + + n.data.Step.CmdWithArgs = n.data.Step.Command + n.data.Step.Command = cmd + n.data.Step.Args = args + + default: + // Shouldn't reach here except for testing. + + if n.data.Step.Command != "" { + value, err := stepContext.EvalString(n.data.Step.Command, cmdutil.WithoutExpandEnv()) + if err != nil { + return fmt.Errorf("failed to eval command: %w", err) + } + n.data.Step.Command = value + } + + for i, arg := range n.data.Step.Args { + value, err := stepContext.EvalString(arg, cmdutil.WithoutExpandEnv()) + if err != nil { + return fmt.Errorf("failed to eval command args: %w", err) + } + n.data.Step.Args[i] = value + } + } + + n.cmdEvaluated = true + return nil +} + func (n *Node) GetRetryCount() int { n.mu.RLock() defer n.mu.RUnlock() diff --git a/internal/digraph/scheduler/node_test.go b/internal/digraph/scheduler/node_test.go index 66dc02acc..59e40d949 100644 --- a/internal/digraph/scheduler/node_test.go +++ b/internal/digraph/scheduler/node_test.go @@ -205,58 +205,50 @@ func TestNode(t *testing.T) { node.Execute(t) node.AssertOutput(t, "OUTPUT_JSON_TEST", `{"key":"value"}`) }) - t.Run("OutputSpecialChar", func(t *testing.T) { - t.Parallel() - - testCases := []struct { - CmdWithArgs string - Want string - }{ - { - CmdWithArgs: `echo "hello\tworld"`, - Want: `hello\tworld`, - }, - { - CmdWithArgs: `echo hello"\t"world`, - Want: `hello\tworld`, - }, - { - CmdWithArgs: `echo hello\tworld`, - Want: `hello\tworld`, - }, - { - CmdWithArgs: `echo hello\nworld`, - Want: `hello\nworld`, - }, - { - CmdWithArgs: `echo {\"key\":\"value\"}`, - Want: `{"key":"value"}`, - }, - { - CmdWithArgs: `echo "{\"key\":\"value\"}"`, - Want: `{"key":"value"}`, - }, - { - CmdWithArgs: `echo 'hello world'`, - Want: `hello world`, - }, - { - CmdWithArgs: `echo hello "world"`, - Want: `hello world`, - }, - { - CmdWithArgs: `echo 'hello "world"'`, - Want: `hello "world"`, - }, - } - - for i, tc := range testCases { - t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { - node := setupNode(t, withNodeCmdArgs(tc.CmdWithArgs), withNodeOutput("OUTPUT_SPECIALCHAR_TEST")) - node.Execute(t) - node.AssertOutput(t, "OUTPUT_SPECIALCHAR_TEST", tc.Want) - }) - } + t.Run("OutputTabWithDoubleQuotes", func(t *testing.T) { + node := setupNode(t, withNodeCmdArgs(`echo "hello\tworld"`), withNodeOutput("OUTPUT")) + node.Execute(t) + node.AssertOutput(t, "OUTPUT", "hello\tworld") + }) + t.Run("OutputTabWithMixedQuotes", func(t *testing.T) { + node := setupNode(t, withNodeCmdArgs(`echo hello"\t"world`), withNodeOutput("OUTPUT")) + node.Execute(t) + node.AssertOutput(t, "OUTPUT", "hello\tworld") // This behavior is aligned with bash + }) + t.Run("OutputTabWithoutQuotes", func(t *testing.T) { + node := setupNode(t, withNodeCmdArgs(`echo hello\tworld`), withNodeOutput("OUTPUT")) + node.Execute(t) + node.AssertOutput(t, "OUTPUT", `hellotworld`) // This behavior is aligned with bash + }) + t.Run("OutputNewlineCharacter", func(t *testing.T) { + node := setupNode(t, withNodeCmdArgs(`echo hello\nworld`), withNodeOutput("OUTPUT")) + node.Execute(t) + node.AssertOutput(t, "OUTPUT", `hellonworld`) // This behavior is aligned with bash + }) + t.Run("OutputEscapedJSONWithoutQuotes", func(t *testing.T) { + node := setupNode(t, withNodeCmdArgs(`echo {\"key\":\"value\"}`), withNodeOutput("OUTPUT")) + node.Execute(t) + node.AssertOutput(t, "OUTPUT", `{"key":"value"}`) + }) + t.Run("OutputEscapedJSONWithQuotes", func(t *testing.T) { + node := setupNode(t, withNodeCmdArgs(`echo "{\"key\":\"value\"}"`), withNodeOutput("OUTPUT")) + node.Execute(t) + node.AssertOutput(t, "OUTPUT", `{"key":"value"}`) + }) + t.Run("OutputSingleQuotedString", func(t *testing.T) { + node := setupNode(t, withNodeCmdArgs(`echo 'hello world'`), withNodeOutput("OUTPUT")) + node.Execute(t) + node.AssertOutput(t, "OUTPUT", `hello world`) + }) + t.Run("OutputMixedQuotesWithSpace", func(t *testing.T) { + node := setupNode(t, withNodeCmdArgs(`echo hello "world"`), withNodeOutput("OUTPUT")) + node.Execute(t) + node.AssertOutput(t, "OUTPUT", `hello world`) + }) + t.Run("OutputNestedQuotes", func(t *testing.T) { + node := setupNode(t, withNodeCmdArgs(`echo 'hello "world"'`), withNodeOutput("OUTPUT")) + node.Execute(t) + node.AssertOutput(t, "OUTPUT", `hello "world"`) }) t.Run("Script", func(t *testing.T) { node := setupNode(t, withNodeScript("echo hello"), withNodeOutput("SCRIPT_TEST")) diff --git a/internal/digraph/scheduler/scheduler_test.go b/internal/digraph/scheduler/scheduler_test.go index 2de4c71bb..4edf4e104 100644 --- a/internal/digraph/scheduler/scheduler_test.go +++ b/internal/digraph/scheduler/scheduler_test.go @@ -706,6 +706,25 @@ func TestScheduler(t *testing.T) { output, _ := node.Data().Step.OutputVariables.Load("RESULT") require.Equal(t, "RESULT=value", output, "expected output %q, got %q", "value", output) }) + t.Run("HandlingJSONWithSpecialChars", func(t *testing.T) { + sc := setup(t) + + jsonData := `{\n\t"key": "value"\n}` + graph := sc.newGraph(t, + newStep("1", withCommand(fmt.Sprintf("echo '%s'", jsonData)), withOutput("OUT")), + newStep("2", withCommand("echo '${OUT.key}'"), withDepends("1"), withOutput("RESULT")), + ) + + result := graph.Schedule(t, scheduler.StatusSuccess) + + result.AssertDoneCount(t, 2) + + // check if RESULT variable is set to "value" + node := result.Node(t, "2") + + output, _ := node.Data().Step.OutputVariables.Load("RESULT") + require.Equal(t, "RESULT=value", output, "expected output %q, got %q", "value", output) + }) t.Run("SpecialVars_DAG_EXECUTION_LOG_PATH", func(t *testing.T) { sc := setup(t) diff --git a/internal/digraph/step.go b/internal/digraph/step.go index b5afd0b94..7b498aade 100644 --- a/internal/digraph/step.go +++ b/internal/digraph/step.go @@ -23,10 +23,14 @@ type Step struct { Dir string `json:"Dir,omitempty"` // ExecutorConfig contains the configuration for the executor. ExecutorConfig ExecutorConfig `json:"ExecutorConfig,omitempty"` - // CmdWithArgs is the command with arguments. + // CmdWithArgs is the command with arguments (only display purpose). CmdWithArgs string `json:"CmdWithArgs,omitempty"` + // CmdArgsSys is the command with arguments for the system. + CmdArgsSys string `json:"CmdArgsSys,omitempty"` // Command specifies only the command without arguments. Command string `json:"Command,omitempty"` + // ShellCmdArgs is the shell command with arguments. + ShellCmdArgs string `json:"ShellCmdArgs,omitempty"` // Script is the script to be executed. Script string `json:"Script,omitempty"` // Args contains the arguments for the command. @@ -102,6 +106,11 @@ type ExecutorConfig struct { Config map[string]any `json:"Config,omitempty"` // Config contains executor-specific configuration. } +// IsCommand returns true if the executor is a command +func (e ExecutorConfig) IsCommand() bool { + return e.Type == "" || e.Type == "command" +} + // RetryPolicy contains the retry policy for a step. type RetryPolicy struct { // Limit is the number of retries allowed. diff --git a/internal/persistence/jsondb/jsondb.go b/internal/persistence/jsondb/jsondb.go index a147eb4d1..96f6ab2d7 100644 --- a/internal/persistence/jsondb/jsondb.go +++ b/internal/persistence/jsondb/jsondb.go @@ -19,6 +19,7 @@ import ( "time" "github.com/dagu-org/dagu/internal/fileutil" + "github.com/dagu-org/dagu/internal/logger" "github.com/dagu-org/dagu/internal/persistence" "github.com/dagu-org/dagu/internal/persistence/filecache" "github.com/dagu-org/dagu/internal/persistence/model" @@ -115,12 +116,14 @@ func (db *JSONDB) Update(ctx context.Context, key, requestID string, status mode return writer.write(status) } -func (db *JSONDB) Open(_ context.Context, key string, timestamp time.Time, requestID string) error { +func (db *JSONDB) Open(ctx context.Context, key string, timestamp time.Time, requestID string) error { filePath, err := db.generateFilePath(key, newUTC(timestamp), requestID) if err != nil { return err } + logger.Infof(ctx, "Initializing status file: %s", filePath) + writer := newWriter(filePath) if err := writer.open(); err != nil { return err