diff --git a/cmdpolling/polling.go b/cmdpolling/polling.go index e3c290e..efd74ec 100644 --- a/cmdpolling/polling.go +++ b/cmdpolling/polling.go @@ -22,10 +22,6 @@ const ( ProcessIdFile = "imco-polling.pid" ) -var ( - commandProcessed = make(chan bool, 1) -) - // Handle signals func handleSysSignals(cancelFunc context.CancelFunc) { log.Debug("handleSysSignals") @@ -91,69 +87,52 @@ func pingRoutine(ctx context.Context, c *cli.Context, longTimePeriod int64, shor formatter := format.GetFormatter() pollingSvc := cmd.WireUpPolling(c) + commandProcessed := make(chan bool, 1) // initialization isRunningCommandRoutine := false longTicker := time.NewTicker(time.Duration(longTimePeriod) * time.Second) - shortTicker := time.NewTicker(time.Duration(shortTimePeriod) * time.Second) currentTicker := longTicker for { - // no need to request until current command ends - if !isRunningCommandRoutine { - log.Debug("Requesting for candidate commands status") - ping, status, err := pollingSvc.Ping() - if err != nil { - formatter.PrintError("Couldn't receive polling ping data", err) - // in low level error, should the ticker set as the the longest time interval? - } else { - // One command is available - if status == 201 { - if ping.PendingCommands { - log.Debug("Detected a candidate command") - isRunningCommandRoutine = true - // set short interval timing!? - // - by default this implies next interval timing sil be short. - // - If command routine has a over delay, long interval timing will be assigned later - if currentTicker != shortTicker { - log.Debug("Ticker assigned: Short") - shortTicker = time.NewTicker(time.Duration(shortTimePeriod) * time.Second) - currentTicker = shortTicker - } - go processingCommandRoutine(pollingSvc, formatter) - } else { - // Since no pending command, long interval timing - log.Debug("Ticker assigned: Long") - currentTicker = time.NewTicker(time.Duration(longTimePeriod) * time.Second) - } - } + log.Debug("Requesting for candidate commands status") + ping, status, err := pollingSvc.Ping() + if err != nil { + formatter.PrintError("Couldn't receive polling ping data", err) + } else { + // One command is available, and no process running + if status == 201 && ping.PendingCommands && !isRunningCommandRoutine { + log.Debug("Detected a candidate command") + isRunningCommandRoutine = true + go processingCommandRoutine(pollingSvc, formatter, commandProcessed) } } log.Debug("Waiting...", currentTicker) select { + case <-commandProcessed: + isRunningCommandRoutine = false + if currentTicker != longTicker { + currentTicker.Stop() + } + log.Debug("Ticker assigned: short") + currentTicker = time.NewTicker(time.Duration(shortTimePeriod) * time.Second) case <-currentTicker.C: + if currentTicker != longTicker { + currentTicker.Stop() + log.Debug("Ticker assigned: Long") + currentTicker = longTicker + } case <-ctx.Done(): log.Debug(ctx.Err()) log.Debug("closing polling") return } - - select { - case <-commandProcessed: - isRunningCommandRoutine = false - default: - // if command routine is currently running and short interval timing runs out, a long interval timing is assigned - if isRunningCommandRoutine && currentTicker == shortTicker { - log.Debug("Ticker re-assigned: Long") - currentTicker = time.NewTicker(time.Duration(longTimePeriod) * time.Second) - } - } } } // Subsidiary routine for commands processing -func processingCommandRoutine(pollingSvc *polling.PollingService, formatter format.Formatter) { +func processingCommandRoutine(pollingSvc *polling.PollingService, formatter format.Formatter, commandProcessed chan bool) { log.Debug("processingCommandRoutine") // 1. Request for the new command available diff --git a/utils/exec.go b/utils/exec.go index 249d7e8..b870432 100644 --- a/utils/exec.go +++ b/utils/exec.go @@ -139,19 +139,46 @@ func RunCmd(command string) (output string, exitCode int, startedAt time.Time, f return } -// RunTracedCmd executes the received command and manages two output pipes (output and error) -// It shouldn't throw any exception/error or stop the process. -func RunTracedCmd(command string) (exitCode int, stdOut string, stdErr string, startedAt time.Time, finishedAt time.Time) { +// Save script/command in a temp file +func createCommandWithFilename(command string) (cmd *exec.Cmd, cmdFileName string) { - var cmd *exec.Cmd + cmdFileName = strings.Join([]string{time.Now().Format(TimeLayoutYYYYMMDDHHMMSS), "_", RandomString(10)}, "") + if runtime.GOOS == "windows" { + cmdFileName = strings.Join([]string{cmdFileName, ".bat"}, "") + } + + // Writes content to file + if err := ioutil.WriteFile(cmdFileName, []byte(command), 0600); err != nil { + log.Fatalf("Error creating temp file: %v", err) + } + // Creates command if runtime.GOOS == "windows" { - log.Infof("Command: %s", command) - cmd = exec.Command("cmd", "/C", command) + cmd = exec.Command("cmd", "/C", cmdFileName) } else { - log.Infof("Command: %s %s", "/bin/sh -c", command) - cmd = exec.Command("/bin/sh", "-c", command) + cmd = exec.Command("/bin/sh", cmdFileName) } + return +} + +// Remove temp file +func deleteTmpCommandFilename(cmdFileName string) { + err := os.Remove(cmdFileName) + if err != nil { + log.Warn("Temp file cannot be removed", err.Error()) + } +} + +// RunTracedCmd executes the received command and manages two output pipes (output and error) +// It shouldn't throw any exception/error or stop the process. +func RunTracedCmd(command string) (exitCode int, stdOut string, stdErr string, startedAt time.Time, finishedAt time.Time) { + log.Debug("RunTracedCmd") + + // Saves script/command in a temp file + var cmd, cmdFileName = createCommandWithFilename(command) + + // Removes temp file + defer deleteTmpCommandFilename(cmdFileName) stdoutIn, err := cmd.StdoutPipe() if err != nil { @@ -206,45 +233,24 @@ func RunTracedCmd(command string) (exitCode int, stdOut string, stdErr string, s return } -func RunContinuousCmd(fn func(chunk string) error, cmdArg string, thresholdTime int) (int, error) { +func RunContinuousCmd(fn func(chunk string) error, command string, thresholdTime int) (int, error) { log.Debug("RunContinuousCmd") // Saves script/command in a temp file - cmdFileName := strings.Join([]string{time.Now().Format(TimeLayoutYYYYMMDDHHMMSS), "_", RandomString(10)}, "") - if runtime.GOOS == "windows" { - cmdFileName = strings.Join([]string{cmdFileName, ".bat"}, "") - } - - // Writes content - if err := ioutil.WriteFile(cmdFileName, []byte(cmdArg), 0600); err != nil { - log.Fatalf("Error creating temp file : ", err) - } - - // Creates command - var newCmd *exec.Cmd - if runtime.GOOS == "windows" { - newCmd = exec.Command("cmd", "/C", cmdFileName) - } else { - newCmd = exec.Command("/bin/sh", cmdFileName) - } + var cmd, cmdFileName = createCommandWithFilename(command) // Removes temp file - defer func() { - err := os.Remove(cmdFileName) - if err != nil { - log.Warn("Temp file cannot be removed", err.Error()) - } - }() + defer deleteTmpCommandFilename(cmdFileName) // Gets the pipe command - stdout, err := newCmd.StdoutPipe() + stdout, err := cmd.StdoutPipe() if err != nil { return 1, fmt.Errorf("cannot get pipe command %v", err) } - log.Info("==> Executing: ", strings.Join(newCmd.Args, " ")) + log.Info("==> Executing: ", strings.Join(cmd.Args, " ")) // Start command asynchronously - if err = newCmd.Start(); err != nil { + if err = cmd.Start(); err != nil { return 1, fmt.Errorf("cannot start the specified command %v", err) } @@ -279,7 +285,7 @@ func RunContinuousCmd(fn func(chunk string) error, cmdArg string, thresholdTime } } - err = newCmd.Wait() + err = cmd.Wait() exitCode := extractExitCode(err) return exitCode, nil