From b48a0f52ed6ba8d75e2b8d5c5110e47bb488c4ca Mon Sep 17 00:00:00 2001 From: Pablo Cantera Date: Fri, 29 Dec 2017 11:47:54 +0100 Subject: [PATCH] Improvements and process adjustments (issue #14) - Refactor Marshal/Unmarshal - Polling: new strategy implemented with 2 time cycling: longTime/shortTime (Defaults: 30/5 seconds). Looking for a better performance and not to overload the API channel: Short time lapse period in order to avoid long waits when commands are pending. Long time lapse period for waiting new commands along polling life cycle. - exec: Polling: new command which executes the received command and manages two output pipes (output and error). It shouldn't throw any exception/error or stop the process at that level Continuous report: only threshold Time is required Closes #14 --- cmdpolling/continuousreport.go | 29 +++++---- cmdpolling/polling.go | 107 +++++++++++++++++++++------------ cmdpolling/subcommands.go | 30 ++++----- utils/exec.go | 101 ++++++++++++++++++++++--------- 4 files changed, 168 insertions(+), 99 deletions(-) diff --git a/cmdpolling/continuousreport.go b/cmdpolling/continuousreport.go index a20ba92..6183388 100644 --- a/cmdpolling/continuousreport.go +++ b/cmdpolling/continuousreport.go @@ -3,21 +3,20 @@ package cmdpolling import ( "errors" "fmt" - "time" "os" - "encoding/json" + "time" log "github.com/Sirupsen/logrus" "github.com/codegangsta/cli" - "github.com/ingrammicro/concerto/api/types" "github.com/ingrammicro/concerto/cmd" "github.com/ingrammicro/concerto/utils" "github.com/ingrammicro/concerto/utils/format" ) const ( - RetriesNumber = 5 - RetriesFactor = 3 + RetriesNumber = 5 + RetriesFactor = 3 + DefaultThresholdTime = 10 ) func cmdContinuousReportRun(c *cli.Context) error { @@ -34,22 +33,22 @@ func cmdContinuousReportRun(c *cli.Context) error { formatter.PrintFatal("argument missing", errors.New("a script or command is required")) } - // cli command thresholds flags - thresholds := utils.Thresholds{Lines: c.Int("lines"), Time: c.Int("time"), Bytes: c.Int("bytes")} + // cli command threshold flag + thresholdTime := c.Int("time") + if !(thresholdTime > 0) { + thresholdTime = DefaultThresholdTime + } + log.Debug("Time threshold:", thresholdTime) // Custom method for chunks processing fn := func(chunk string) error { log.Debug("sendChunks") - err := retry(RetriesNumber, time.Second, func() error { log.Debug("Sending: ", chunk) - command := types.PollingContinuousReport{ - Stdout: chunk, - } - var commandIn map[string]interface{} - inRec, _ := json.Marshal(command) - json.Unmarshal(inRec, &commandIn) + commandIn := map[string]interface{}{ + "stdout": chunk, + } _, statusCode, err := pollingSvc.ReportBootstrapLog(&commandIn) switch { @@ -71,7 +70,7 @@ func cmdContinuousReportRun(c *cli.Context) error { return nil } - exitCode, err := utils.RunContinuousReportRun(fn, cmdArg, thresholds) + exitCode, err := utils.RunContinuousCmd(fn, cmdArg, thresholdTime) if err != nil { formatter.PrintFatal("cannot process continuous report command", err) } diff --git a/cmdpolling/polling.go b/cmdpolling/polling.go index 30d724d..e3c290e 100644 --- a/cmdpolling/polling.go +++ b/cmdpolling/polling.go @@ -6,7 +6,6 @@ import ( "os/signal" "syscall" "time" - "encoding/json" "strings" log "github.com/Sirupsen/logrus" @@ -18,8 +17,9 @@ import ( ) const ( - DefaultPollingPingTimingInterval = 30 - ProcessIdFile = "imco-polling.pid" + DefaultPollingPingTimingIntervalLong = 30 + DefaultPollingPingTimingIntervalShort = 5 + ProcessIdFile = "imco-polling.pid" ) var ( @@ -37,8 +37,8 @@ func handleSysSignals(cancelFunc context.CancelFunc) { } // Returns the full path to the tmp folder joined with pid management file name -func getProcessIdFilePath() string{ - return strings.Join([]string{os.TempDir(), string(os.PathSeparator), ProcessIdFile}, "") +func getProcessIdFilePath() string { + return strings.Join([]string{os.TempDir(), string(os.PathSeparator), ProcessIdFile}, "") } // Start the polling process @@ -50,18 +50,24 @@ func cmdStart(c *cli.Context) error { formatter.PrintFatal("cannot create the pid file", err) } - pollingPingTimingInterval := c.Int64("time") - if !(pollingPingTimingInterval > 0) { - pollingPingTimingInterval = DefaultPollingPingTimingInterval + pollingPingTimingIntervalLong := c.Int64("longTime") + if !(pollingPingTimingIntervalLong > 0) { + pollingPingTimingIntervalLong = DefaultPollingPingTimingIntervalLong } - log.Debug("Ping time interval:", pollingPingTimingInterval) + log.Debug("Ping long time interval:", pollingPingTimingIntervalLong) + + pollingPingTimingIntervalShort := c.Int64("shortTime") + if !(pollingPingTimingIntervalShort > 0) { + pollingPingTimingIntervalShort = DefaultPollingPingTimingIntervalShort + } + log.Debug("Ping short time interval:", pollingPingTimingIntervalShort) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go handleSysSignals(cancel) - pingRoutine(ctx, c, pollingPingTimingInterval) + pingRoutine(ctx, c, pollingPingTimingIntervalLong, pollingPingTimingIntervalShort) return nil } @@ -80,41 +86,69 @@ func cmdStop(c *cli.Context) error { } // Main polling background routine -func pingRoutine(ctx context.Context, c *cli.Context, pollingPingTimingInterval int64) { +func pingRoutine(ctx context.Context, c *cli.Context, longTimePeriod int64, shortTimePeriod int64) { log.Debug("pingRoutine") formatter := format.GetFormatter() pollingSvc := cmd.WireUpPolling(c) + // initialization isRunningCommandRoutine := false - t := time.NewTicker(time.Duration(pollingPingTimingInterval) * time.Second) + longTicker := time.NewTicker(time.Duration(longTimePeriod) * time.Second) + shortTicker := time.NewTicker(time.Duration(shortTimePeriod) * time.Second) + currentTicker := longTicker for { - log.Debug("Requesting for candidate commands status") - ping, status, err := pollingSvc.Ping() - if err != nil { - formatter.PrintError("Couldn't receive polling ping data", err) - } else { - // One command is available, and no process running - if status == 201 && ping.PendingCommands && !isRunningCommandRoutine { - log.Debug("Detected a candidate command") - isRunningCommandRoutine = true - go processingCommandRoutine(pollingSvc, formatter) + // no need to request until current command ends + if !isRunningCommandRoutine { + log.Debug("Requesting for candidate commands status") + ping, status, err := pollingSvc.Ping() + if err != nil { + formatter.PrintError("Couldn't receive polling ping data", err) + // in low level error, should the ticker set as the the longest time interval? + } else { + // One command is available + if status == 201 { + if ping.PendingCommands { + log.Debug("Detected a candidate command") + isRunningCommandRoutine = true + // set short interval timing!? + // - by default this implies next interval timing sil be short. + // - If command routine has a over delay, long interval timing will be assigned later + if currentTicker != shortTicker { + log.Debug("Ticker assigned: Short") + shortTicker = time.NewTicker(time.Duration(shortTimePeriod) * time.Second) + currentTicker = shortTicker + } + go processingCommandRoutine(pollingSvc, formatter) + } else { + // Since no pending command, long interval timing + log.Debug("Ticker assigned: Long") + currentTicker = time.NewTicker(time.Duration(longTimePeriod) * time.Second) + } + } } } - select { - case <-commandProcessed: - isRunningCommandRoutine = false - default: - } + log.Debug("Waiting...", currentTicker) select { - case <-t.C: + case <-currentTicker.C: case <-ctx.Done(): log.Debug(ctx.Err()) log.Debug("closing polling") return } + + select { + case <-commandProcessed: + isRunningCommandRoutine = false + default: + // if command routine is currently running and short interval timing runs out, a long interval timing is assigned + if isRunningCommandRoutine && currentTicker == shortTicker { + log.Debug("Ticker re-assigned: Long") + currentTicker = time.NewTicker(time.Duration(longTimePeriod) * time.Second) + } + } } } @@ -132,19 +166,18 @@ func processingCommandRoutine(pollingSvc *polling.PollingService, formatter form // 2. Execute the retrieved command if status == 200 { log.Debug("Running the retrieved command") - command.Stdout, command.ExitCode, _, _ = utils.RunCmd(command.Script) - command.Stderr = "" - if command.ExitCode != 0 { - command.Stderr = command.Stdout - command.Stdout = "" - } + command.ExitCode, command.Stdout, command.Stderr, _, _ = utils.RunTracedCmd(command.Script) // 3. then status is propagated to IMCO log.Debug("Reporting command execution status") - var commandIn map[string]interface{} - inRec, _ := json.Marshal(command) - json.Unmarshal(inRec, &commandIn) + commandIn := map[string]interface{}{ + "id": command.Id, + "script": command.Script, + "stdout": command.Stdout, + "stderr": command.Stderr, + "exit_code": command.ExitCode, + } _, status, err := pollingSvc.UpdateCommand(&commandIn, command.Id) if err != nil { diff --git a/cmdpolling/subcommands.go b/cmdpolling/subcommands.go index fd73f46..1c18674 100644 --- a/cmdpolling/subcommands.go +++ b/cmdpolling/subcommands.go @@ -2,7 +2,6 @@ package cmdpolling import ( "github.com/codegangsta/cli" - "github.com/ingrammicro/concerto/utils" ) func SubCommands() []cli.Command { @@ -18,9 +17,14 @@ func SubCommands() []cli.Command { Action: cmdStart, Flags: []cli.Flag{ cli.Int64Flag{ - Name: "time, t", - Usage: "Polling ping time interval (seconds)", - Value: 30, + Name: "longTime, l", + Usage: "Polling ping long time interval (seconds)", + Value: DefaultPollingPingTimingIntervalLong, + }, + cli.Int64Flag{ + Name: "shortTime, s", + Usage: "Polling ping short time interval (seconds)", + Value: DefaultPollingPingTimingIntervalShort, }, }, }, @@ -30,25 +34,15 @@ func SubCommands() []cli.Command { Action: cmdStop, }, { - Name: "continuous-report-run", - Usage: "Runs a script and gradually report its output", - Action: cmdContinuousReportRun, + Name: "continuous-report-run", + Usage: "Runs a script and gradually report its output", + Action: cmdContinuousReportRun, ArgsUsage: "script", Flags: []cli.Flag{ - cli.IntFlag{ - Name: "lines, l", - Usage: "Maximum lines threshold per response chunk", - Value: utils.DefaultThresholdLines, - }, cli.IntFlag{ Name: "time, t", Usage: "Maximum time -seconds- threshold per response chunk", - Value: utils.DefaultThresholdTime, - }, - cli.IntFlag{ - Name: "bytes, b", - Usage: "Maximum bytes threshold per response chunk", - Value: utils.DefaultThresholdBytes, + Value: DefaultThresholdTime, }, }, }, diff --git a/utils/exec.go b/utils/exec.go index 1424287..249d7e8 100644 --- a/utils/exec.go +++ b/utils/exec.go @@ -19,17 +19,8 @@ import ( const ( TimeStampLayout = "2006-01-02T15:04:05.000000-07:00" TimeLayoutYYYYMMDDHHMMSS = "20060102150405" - DefaultThresholdLines = 10 - DefaultThresholdTime = 10 - DefaultThresholdBytes = 500 ) -type Thresholds struct { - Lines int - Time int - Bytes int -} - func extractExitCode(err error) int { if err != nil { switch err.(type) { @@ -148,20 +139,75 @@ func RunCmd(command string) (output string, exitCode int, startedAt time.Time, f return } -func RunContinuousReportRun(fn func(chunk string) error, cmdArg string, thresholds Thresholds) (int, error) { - log.Debug("RunContinuousReportRun") +// RunTracedCmd executes the received command and manages two output pipes (output and error) +// It shouldn't throw any exception/error or stop the process. +func RunTracedCmd(command string) (exitCode int, stdOut string, stdErr string, startedAt time.Time, finishedAt time.Time) { + + var cmd *exec.Cmd + + if runtime.GOOS == "windows" { + log.Infof("Command: %s", command) + cmd = exec.Command("cmd", "/C", command) + } else { + log.Infof("Command: %s %s", "/bin/sh -c", command) + cmd = exec.Command("/bin/sh", "-c", command) + } + + stdoutIn, err := cmd.StdoutPipe() + if err != nil { + log.Error("cmd.StdoutPipe() failed: ", err) + } + + stderrIn, err := cmd.StderrPipe() + if err != nil { + log.Error("cmd.StderrPipe() failed: ", err) + } + + var errStdout, errStderr error + var stdoutBuf, stderrBuf bytes.Buffer + stdout := io.MultiWriter(os.Stdout, &stdoutBuf) + stderr := io.MultiWriter(os.Stderr, &stderrBuf) - // command thresholds flags - if !(thresholds.Lines > 0) { - thresholds.Lines = DefaultThresholdLines + if err = cmd.Start(); err != nil { + log.Error("cmd.Start() failed: ", err) } - if !(thresholds.Time > 0) { - thresholds.Time = DefaultThresholdTime + + go func() { + _, errStdout = io.Copy(stdout, stdoutIn) + }() + + go func() { + _, errStderr = io.Copy(stderr, stderrIn) + }() + + if err = cmd.Wait(); err != nil { + log.Error("cmd.Wait() failed: ", err) } - if !(thresholds.Bytes > 0) { - thresholds.Bytes = DefaultThresholdBytes + + if errStdout != nil { + log.Error("failed to capture stdout: ", errStdout) + } + + if errStderr != nil { + log.Error("failed to capture stderr: ", errStderr) } - log.Debug("Threshold", thresholds) + + exitCode = extractExitCode(err) + stdOut = string(stdoutBuf.Bytes()) + stdErr = string(stderrBuf.Bytes()) + startedAt = time.Now() + finishedAt = time.Now() + + log.Infof("Exit Code: %d", exitCode) + log.Debugf("Stdout: %s", stdOut) + log.Debugf("Stderr: %s", stdErr) + log.Debugf("Starting Time: %s", startedAt.Format(TimeStampLayout)) + log.Debugf("End Time: %s", finishedAt.Format(TimeStampLayout)) + return +} + +func RunContinuousCmd(fn func(chunk string) error, cmdArg string, thresholdTime int) (int, error) { + log.Debug("RunContinuousCmd") // Saves script/command in a temp file cmdFileName := strings.Join([]string{time.Now().Format(TimeLayoutYYYYMMDDHHMMSS), "_", RandomString(10)}, "") @@ -203,24 +249,21 @@ func RunContinuousReportRun(fn func(chunk string) error, cmdArg string, threshol } chunk := "" - nLines, nTime, nBytes := 0, 0, 0 + nTime := 0 timeStart := time.Now() scanner := bufio.NewScanner(bufio.NewReader(stdout)) for scanner.Scan() { chunk = strings.Join([]string{chunk, scanner.Text(), "\n"}, "") - nLines++ nTime = int(time.Now().Sub(timeStart).Seconds()) - nBytes = len(chunk) - if nLines >= thresholds.Lines || nTime >= thresholds.Time || nBytes >= thresholds.Bytes { + if nTime >= thresholdTime { if err := fn(chunk); err != nil { - nLines, nTime = 0, 0 - timeStart = time.Now() + nTime = 0 } else { chunk = "" - nLines, nTime, nBytes = 0, 0, 0 - timeStart = time.Now() + nTime = 0 } + timeStart = time.Now() } } @@ -230,9 +273,9 @@ func RunContinuousReportRun(fn func(chunk string) error, cmdArg string, threshol } if len(chunk) > 0 { - log.Debug("Sending the last pending chunk") + log.Debug("Processing the last pending chunk") if err := fn(chunk); err != nil { - log.Error("Cannot send the last chunk", err.Error()) + log.Error("Cannot process the last chunk", err.Error()) } }