Skip to content
This repository has been archived by the owner on Nov 8, 2019. It is now read-only.

Commit

Permalink
Improvements and process adjustments (issue #14)
Browse files Browse the repository at this point in the history
- Refactor Marshal/Unmarshal
- Polling: new strategy implemented with 2 time cycling: longTime/shortTime (Defaults: 30/5 seconds).
Looking for a better performance and not to overload the API channel:
Short time lapse period in order to avoid long waits when commands are pending.
Long time lapse period for waiting new commands along polling life cycle.

- exec:
Polling: new command which executes the received command and manages two output pipes (output and error). It shouldn't throw any exception/error or stop the process at that level
Continuous report: only threshold Time is required

Closes #14
  • Loading branch information
Pablo Cantera committed Dec 29, 2017
1 parent 807939e commit b48a0f5
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 99 deletions.
29 changes: 14 additions & 15 deletions cmdpolling/continuousreport.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,20 @@ package cmdpolling
import (
"errors"
"fmt"
"time"
"os"
"encoding/json"
"time"

log "github.com/Sirupsen/logrus"
"github.com/codegangsta/cli"
"github.com/ingrammicro/concerto/api/types"
"github.com/ingrammicro/concerto/cmd"
"github.com/ingrammicro/concerto/utils"
"github.com/ingrammicro/concerto/utils/format"
)

const (
RetriesNumber = 5
RetriesFactor = 3
RetriesNumber = 5
RetriesFactor = 3
DefaultThresholdTime = 10
)

func cmdContinuousReportRun(c *cli.Context) error {
Expand All @@ -34,22 +33,22 @@ func cmdContinuousReportRun(c *cli.Context) error {
formatter.PrintFatal("argument missing", errors.New("a script or command is required"))
}

// cli command thresholds flags
thresholds := utils.Thresholds{Lines: c.Int("lines"), Time: c.Int("time"), Bytes: c.Int("bytes")}
// cli command threshold flag
thresholdTime := c.Int("time")
if !(thresholdTime > 0) {
thresholdTime = DefaultThresholdTime
}
log.Debug("Time threshold:", thresholdTime)

// Custom method for chunks processing
fn := func(chunk string) error {
log.Debug("sendChunks")

err := retry(RetriesNumber, time.Second, func() error {
log.Debug("Sending: ", chunk)
command := types.PollingContinuousReport{
Stdout: chunk,
}

var commandIn map[string]interface{}
inRec, _ := json.Marshal(command)
json.Unmarshal(inRec, &commandIn)
commandIn := map[string]interface{}{
"stdout": chunk,
}

_, statusCode, err := pollingSvc.ReportBootstrapLog(&commandIn)
switch {
Expand All @@ -71,7 +70,7 @@ func cmdContinuousReportRun(c *cli.Context) error {
return nil
}

exitCode, err := utils.RunContinuousReportRun(fn, cmdArg, thresholds)
exitCode, err := utils.RunContinuousCmd(fn, cmdArg, thresholdTime)
if err != nil {
formatter.PrintFatal("cannot process continuous report command", err)
}
Expand Down
107 changes: 70 additions & 37 deletions cmdpolling/polling.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"os/signal"
"syscall"
"time"
"encoding/json"
"strings"

log "github.com/Sirupsen/logrus"
Expand All @@ -18,8 +17,9 @@ import (
)

const (
DefaultPollingPingTimingInterval = 30
ProcessIdFile = "imco-polling.pid"
DefaultPollingPingTimingIntervalLong = 30
DefaultPollingPingTimingIntervalShort = 5
ProcessIdFile = "imco-polling.pid"
)

var (
Expand All @@ -37,8 +37,8 @@ func handleSysSignals(cancelFunc context.CancelFunc) {
}

// Returns the full path to the tmp folder joined with pid management file name
func getProcessIdFilePath() string{
return strings.Join([]string{os.TempDir(), string(os.PathSeparator), ProcessIdFile}, "")
func getProcessIdFilePath() string {
return strings.Join([]string{os.TempDir(), string(os.PathSeparator), ProcessIdFile}, "")
}

// Start the polling process
Expand All @@ -50,18 +50,24 @@ func cmdStart(c *cli.Context) error {
formatter.PrintFatal("cannot create the pid file", err)
}

pollingPingTimingInterval := c.Int64("time")
if !(pollingPingTimingInterval > 0) {
pollingPingTimingInterval = DefaultPollingPingTimingInterval
pollingPingTimingIntervalLong := c.Int64("longTime")
if !(pollingPingTimingIntervalLong > 0) {
pollingPingTimingIntervalLong = DefaultPollingPingTimingIntervalLong
}
log.Debug("Ping time interval:", pollingPingTimingInterval)
log.Debug("Ping long time interval:", pollingPingTimingIntervalLong)

pollingPingTimingIntervalShort := c.Int64("shortTime")
if !(pollingPingTimingIntervalShort > 0) {
pollingPingTimingIntervalShort = DefaultPollingPingTimingIntervalShort
}
log.Debug("Ping short time interval:", pollingPingTimingIntervalShort)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go handleSysSignals(cancel)

pingRoutine(ctx, c, pollingPingTimingInterval)
pingRoutine(ctx, c, pollingPingTimingIntervalLong, pollingPingTimingIntervalShort)

return nil
}
Expand All @@ -80,41 +86,69 @@ func cmdStop(c *cli.Context) error {
}

// Main polling background routine
func pingRoutine(ctx context.Context, c *cli.Context, pollingPingTimingInterval int64) {
func pingRoutine(ctx context.Context, c *cli.Context, longTimePeriod int64, shortTimePeriod int64) {
log.Debug("pingRoutine")

formatter := format.GetFormatter()
pollingSvc := cmd.WireUpPolling(c)

// initialization
isRunningCommandRoutine := false
t := time.NewTicker(time.Duration(pollingPingTimingInterval) * time.Second)
longTicker := time.NewTicker(time.Duration(longTimePeriod) * time.Second)
shortTicker := time.NewTicker(time.Duration(shortTimePeriod) * time.Second)
currentTicker := longTicker
for {
log.Debug("Requesting for candidate commands status")
ping, status, err := pollingSvc.Ping()
if err != nil {
formatter.PrintError("Couldn't receive polling ping data", err)
} else {
// One command is available, and no process running
if status == 201 && ping.PendingCommands && !isRunningCommandRoutine {
log.Debug("Detected a candidate command")
isRunningCommandRoutine = true
go processingCommandRoutine(pollingSvc, formatter)
// no need to request until current command ends
if !isRunningCommandRoutine {
log.Debug("Requesting for candidate commands status")
ping, status, err := pollingSvc.Ping()
if err != nil {
formatter.PrintError("Couldn't receive polling ping data", err)
// in low level error, should the ticker set as the the longest time interval?
} else {
// One command is available
if status == 201 {
if ping.PendingCommands {
log.Debug("Detected a candidate command")
isRunningCommandRoutine = true
// set short interval timing!?
// - by default this implies next interval timing sil be short.
// - If command routine has a over delay, long interval timing will be assigned later
if currentTicker != shortTicker {
log.Debug("Ticker assigned: Short")
shortTicker = time.NewTicker(time.Duration(shortTimePeriod) * time.Second)
currentTicker = shortTicker
}
go processingCommandRoutine(pollingSvc, formatter)
} else {
// Since no pending command, long interval timing
log.Debug("Ticker assigned: Long")
currentTicker = time.NewTicker(time.Duration(longTimePeriod) * time.Second)
}
}
}
}

select {
case <-commandProcessed:
isRunningCommandRoutine = false
default:
}
log.Debug("Waiting...", currentTicker)

select {
case <-t.C:
case <-currentTicker.C:
case <-ctx.Done():
log.Debug(ctx.Err())
log.Debug("closing polling")
return
}

select {
case <-commandProcessed:
isRunningCommandRoutine = false
default:
// if command routine is currently running and short interval timing runs out, a long interval timing is assigned
if isRunningCommandRoutine && currentTicker == shortTicker {
log.Debug("Ticker re-assigned: Long")
currentTicker = time.NewTicker(time.Duration(longTimePeriod) * time.Second)
}
}
}
}

Expand All @@ -132,19 +166,18 @@ func processingCommandRoutine(pollingSvc *polling.PollingService, formatter form
// 2. Execute the retrieved command
if status == 200 {
log.Debug("Running the retrieved command")
command.Stdout, command.ExitCode, _, _ = utils.RunCmd(command.Script)
command.Stderr = ""
if command.ExitCode != 0 {
command.Stderr = command.Stdout
command.Stdout = ""
}
command.ExitCode, command.Stdout, command.Stderr, _, _ = utils.RunTracedCmd(command.Script)

// 3. then status is propagated to IMCO
log.Debug("Reporting command execution status")

var commandIn map[string]interface{}
inRec, _ := json.Marshal(command)
json.Unmarshal(inRec, &commandIn)
commandIn := map[string]interface{}{
"id": command.Id,
"script": command.Script,
"stdout": command.Stdout,
"stderr": command.Stderr,
"exit_code": command.ExitCode,
}

_, status, err := pollingSvc.UpdateCommand(&commandIn, command.Id)
if err != nil {
Expand Down
30 changes: 12 additions & 18 deletions cmdpolling/subcommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cmdpolling

import (
"github.com/codegangsta/cli"
"github.com/ingrammicro/concerto/utils"
)

func SubCommands() []cli.Command {
Expand All @@ -18,9 +17,14 @@ func SubCommands() []cli.Command {
Action: cmdStart,
Flags: []cli.Flag{
cli.Int64Flag{
Name: "time, t",
Usage: "Polling ping time interval (seconds)",
Value: 30,
Name: "longTime, l",
Usage: "Polling ping long time interval (seconds)",
Value: DefaultPollingPingTimingIntervalLong,
},
cli.Int64Flag{
Name: "shortTime, s",
Usage: "Polling ping short time interval (seconds)",
Value: DefaultPollingPingTimingIntervalShort,
},
},
},
Expand All @@ -30,25 +34,15 @@ func SubCommands() []cli.Command {
Action: cmdStop,
},
{
Name: "continuous-report-run",
Usage: "Runs a script and gradually report its output",
Action: cmdContinuousReportRun,
Name: "continuous-report-run",
Usage: "Runs a script and gradually report its output",
Action: cmdContinuousReportRun,
ArgsUsage: "script",
Flags: []cli.Flag{
cli.IntFlag{
Name: "lines, l",
Usage: "Maximum lines threshold per response chunk",
Value: utils.DefaultThresholdLines,
},
cli.IntFlag{
Name: "time, t",
Usage: "Maximum time -seconds- threshold per response chunk",
Value: utils.DefaultThresholdTime,
},
cli.IntFlag{
Name: "bytes, b",
Usage: "Maximum bytes threshold per response chunk",
Value: utils.DefaultThresholdBytes,
Value: DefaultThresholdTime,
},
},
},
Expand Down
Loading

0 comments on commit b48a0f5

Please sign in to comment.