Skip to content
This repository has been archived by the owner on Nov 8, 2019. It is now read-only.

Commit

Permalink
Refactor and adjustments (issue #14)
Browse files Browse the repository at this point in the history
- Adjusting pingRoutine mechanism
- commandProcessed as parameter
- Processing command polling requires to save in file prior to execute. In addition, refactoring for reusing in commond methods

Closes #14
  • Loading branch information
Pablo Cantera committed Jan 24, 2018
1 parent b48a0f5 commit 2057e3f
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 81 deletions.
69 changes: 24 additions & 45 deletions cmdpolling/polling.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ const (
ProcessIdFile = "imco-polling.pid"
)

var (
commandProcessed = make(chan bool, 1)
)

// Handle signals
func handleSysSignals(cancelFunc context.CancelFunc) {
log.Debug("handleSysSignals")
Expand Down Expand Up @@ -91,69 +87,52 @@ func pingRoutine(ctx context.Context, c *cli.Context, longTimePeriod int64, shor

formatter := format.GetFormatter()
pollingSvc := cmd.WireUpPolling(c)
commandProcessed := make(chan bool, 1)

// initialization
isRunningCommandRoutine := false
longTicker := time.NewTicker(time.Duration(longTimePeriod) * time.Second)
shortTicker := time.NewTicker(time.Duration(shortTimePeriod) * time.Second)
currentTicker := longTicker
for {
// no need to request until current command ends
if !isRunningCommandRoutine {
log.Debug("Requesting for candidate commands status")
ping, status, err := pollingSvc.Ping()
if err != nil {
formatter.PrintError("Couldn't receive polling ping data", err)
// in low level error, should the ticker set as the the longest time interval?
} else {
// One command is available
if status == 201 {
if ping.PendingCommands {
log.Debug("Detected a candidate command")
isRunningCommandRoutine = true
// set short interval timing!?
// - by default this implies next interval timing sil be short.
// - If command routine has a over delay, long interval timing will be assigned later
if currentTicker != shortTicker {
log.Debug("Ticker assigned: Short")
shortTicker = time.NewTicker(time.Duration(shortTimePeriod) * time.Second)
currentTicker = shortTicker
}
go processingCommandRoutine(pollingSvc, formatter)
} else {
// Since no pending command, long interval timing
log.Debug("Ticker assigned: Long")
currentTicker = time.NewTicker(time.Duration(longTimePeriod) * time.Second)
}
}
log.Debug("Requesting for candidate commands status")
ping, status, err := pollingSvc.Ping()
if err != nil {
formatter.PrintError("Couldn't receive polling ping data", err)
} else {
// One command is available, and no process running
if status == 201 && ping.PendingCommands && !isRunningCommandRoutine {
log.Debug("Detected a candidate command")
isRunningCommandRoutine = true
go processingCommandRoutine(pollingSvc, formatter, commandProcessed)
}
}

log.Debug("Waiting...", currentTicker)

select {
case <-commandProcessed:
isRunningCommandRoutine = false
if currentTicker != longTicker {
currentTicker.Stop()
}
log.Debug("Ticker assigned: short")
currentTicker = time.NewTicker(time.Duration(shortTimePeriod) * time.Second)
case <-currentTicker.C:
if currentTicker != longTicker {
currentTicker.Stop()
log.Debug("Ticker assigned: Long")
currentTicker = longTicker
}
case <-ctx.Done():
log.Debug(ctx.Err())
log.Debug("closing polling")
return
}

select {
case <-commandProcessed:
isRunningCommandRoutine = false
default:
// if command routine is currently running and short interval timing runs out, a long interval timing is assigned
if isRunningCommandRoutine && currentTicker == shortTicker {
log.Debug("Ticker re-assigned: Long")
currentTicker = time.NewTicker(time.Duration(longTimePeriod) * time.Second)
}
}
}
}

// Subsidiary routine for commands processing
func processingCommandRoutine(pollingSvc *polling.PollingService, formatter format.Formatter) {
func processingCommandRoutine(pollingSvc *polling.PollingService, formatter format.Formatter, commandProcessed chan bool) {
log.Debug("processingCommandRoutine")

// 1. Request for the new command available
Expand Down
78 changes: 42 additions & 36 deletions utils/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,19 +139,46 @@ func RunCmd(command string) (output string, exitCode int, startedAt time.Time, f
return
}

// RunTracedCmd executes the received command and manages two output pipes (output and error)
// It shouldn't throw any exception/error or stop the process.
func RunTracedCmd(command string) (exitCode int, stdOut string, stdErr string, startedAt time.Time, finishedAt time.Time) {
// Save script/command in a temp file
func createCommandWithFilename(command string) (cmd *exec.Cmd, cmdFileName string) {

var cmd *exec.Cmd
cmdFileName = strings.Join([]string{time.Now().Format(TimeLayoutYYYYMMDDHHMMSS), "_", RandomString(10)}, "")
if runtime.GOOS == "windows" {
cmdFileName = strings.Join([]string{cmdFileName, ".bat"}, "")
}

// Writes content to file
if err := ioutil.WriteFile(cmdFileName, []byte(command), 0600); err != nil {
log.Fatalf("Error creating temp file: %v", err)
}

// Creates command
if runtime.GOOS == "windows" {
log.Infof("Command: %s", command)
cmd = exec.Command("cmd", "/C", command)
cmd = exec.Command("cmd", "/C", cmdFileName)
} else {
log.Infof("Command: %s %s", "/bin/sh -c", command)
cmd = exec.Command("/bin/sh", "-c", command)
cmd = exec.Command("/bin/sh", cmdFileName)
}
return
}

// Remove temp file
func deleteTmpCommandFilename(cmdFileName string) {
err := os.Remove(cmdFileName)
if err != nil {
log.Warn("Temp file cannot be removed", err.Error())
}
}

// RunTracedCmd executes the received command and manages two output pipes (output and error)
// It shouldn't throw any exception/error or stop the process.
func RunTracedCmd(command string) (exitCode int, stdOut string, stdErr string, startedAt time.Time, finishedAt time.Time) {
log.Debug("RunTracedCmd")

// Saves script/command in a temp file
var cmd, cmdFileName = createCommandWithFilename(command)

// Removes temp file
defer deleteTmpCommandFilename(cmdFileName)

stdoutIn, err := cmd.StdoutPipe()
if err != nil {
Expand Down Expand Up @@ -206,45 +233,24 @@ func RunTracedCmd(command string) (exitCode int, stdOut string, stdErr string, s
return
}

func RunContinuousCmd(fn func(chunk string) error, cmdArg string, thresholdTime int) (int, error) {
func RunContinuousCmd(fn func(chunk string) error, command string, thresholdTime int) (int, error) {
log.Debug("RunContinuousCmd")

// Saves script/command in a temp file
cmdFileName := strings.Join([]string{time.Now().Format(TimeLayoutYYYYMMDDHHMMSS), "_", RandomString(10)}, "")
if runtime.GOOS == "windows" {
cmdFileName = strings.Join([]string{cmdFileName, ".bat"}, "")
}

// Writes content
if err := ioutil.WriteFile(cmdFileName, []byte(cmdArg), 0600); err != nil {
log.Fatalf("Error creating temp file : ", err)
}

// Creates command
var newCmd *exec.Cmd
if runtime.GOOS == "windows" {
newCmd = exec.Command("cmd", "/C", cmdFileName)
} else {
newCmd = exec.Command("/bin/sh", cmdFileName)
}
var cmd, cmdFileName = createCommandWithFilename(command)

// Removes temp file
defer func() {
err := os.Remove(cmdFileName)
if err != nil {
log.Warn("Temp file cannot be removed", err.Error())
}
}()
defer deleteTmpCommandFilename(cmdFileName)

// Gets the pipe command
stdout, err := newCmd.StdoutPipe()
stdout, err := cmd.StdoutPipe()
if err != nil {
return 1, fmt.Errorf("cannot get pipe command %v", err)
}
log.Info("==> Executing: ", strings.Join(newCmd.Args, " "))
log.Info("==> Executing: ", strings.Join(cmd.Args, " "))

// Start command asynchronously
if err = newCmd.Start(); err != nil {
if err = cmd.Start(); err != nil {
return 1, fmt.Errorf("cannot start the specified command %v", err)
}

Expand Down Expand Up @@ -279,7 +285,7 @@ func RunContinuousCmd(fn func(chunk string) error, cmdArg string, thresholdTime
}
}

err = newCmd.Wait()
err = cmd.Wait()
exitCode := extractExitCode(err)

return exitCode, nil
Expand Down

0 comments on commit 2057e3f

Please sign in to comment.