Skip to content
This repository has been archived by the owner on Nov 8, 2019. It is now read-only.

Commit

Permalink
Refactor the report run algorithm (issue #14)
Browse files Browse the repository at this point in the history
Closes #14
  • Loading branch information
pcantera committed Dec 4, 2017
1 parent 8a7ea5b commit ee9daa4
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 143 deletions.
171 changes: 33 additions & 138 deletions cmdpolling/continuousreport.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,23 @@
package cmdpolling

import (
"bufio"
"errors"
"fmt"
"io/ioutil"
"os"
"os/exec"
"runtime"
"strings"
"time"

log "github.com/Sirupsen/logrus"
"github.com/codegangsta/cli"
"github.com/ingrammicro/concerto/api/polling"
"github.com/ingrammicro/concerto/api/types"
"github.com/ingrammicro/concerto/cmd"
"github.com/ingrammicro/concerto/utils"
"github.com/ingrammicro/concerto/utils/format"
)

const (
DefaultThresholdLines = 10
DefaultThresholdTime = 10
DefaultThresholdBytes = 500
RetriesNumber = 5
RetriesFactor = 3
RetriesNumber = 5
RetriesFactor = 3
)

type Threshold struct {
lines int
time int
bytes int
}

func cmdContinuousReportRun(c *cli.Context) error {
log.Debug("cmdContinuousReportRun")

Expand All @@ -49,104 +33,47 @@ func cmdContinuousReportRun(c *cli.Context) error {
}

// cli command thresholds flags
thresholdLines := c.Int("lines")
if !(thresholdLines > 0) {
thresholdLines = DefaultThresholdLines
}
thresholdTime := c.Int("time")
if !(thresholdTime > 0) {
thresholdTime = DefaultThresholdTime
}
thresholdBytes := c.Int("bytes")
if !(thresholdBytes > 0) {
thresholdBytes = DefaultThresholdBytes
}
threshold := Threshold{lines: thresholdLines, time: thresholdTime, bytes: thresholdBytes}
log.Debug("Threshold", threshold)

if err := processCommand(pollingSvc, cmdArg, threshold); err != nil {
formatter.PrintFatal("cannot process continuous report command", err)
}

log.Info("completed")
return nil
}

func processCommand(pollingSvc *polling.PollingService, cmdArg string, threshold Threshold) error {
log.Debug("processCommand")
thresholds := utils.Thresholds{Lines: c.Int("lines"), Time: c.Int("time"), Bytes: c.Int("bytes")}

// Saves script/command in a temp file
cmdFileName := strings.Join([]string{time.Now().Format("20060102150405"), "_", utils.RandomString(10)}, "")
if runtime.GOOS == "windows" {
cmdFileName = strings.Join([]string{cmdFileName, ".bat"}, "")
}
// Custom method for chunks processing
fn := func(chunk string) error {
log.Debug("sendChunks")

// Writes content
if err := ioutil.WriteFile(cmdFileName, []byte(cmdArg), 0600); err != nil {
log.Fatalf("Error creating temp file : ", err)
}
err := retry(RetriesNumber, time.Second, func() error {
log.Debug("Sending: ", chunk)
command := types.PollingContinuousReport{
Stdout: chunk,
}
commandIn, err := utils.ItemConvertParamsWithTagAsID(command)
if err != nil {
return fmt.Errorf("error parsing payload %v", err)
}

// Creates command
var newCmd *exec.Cmd
if runtime.GOOS == "windows" {
newCmd = exec.Command("cmd", "/C", cmdFileName)
} else {
newCmd = exec.Command("/bin/sh", cmdFileName)
}
_, statusCode, err := pollingSvc.ReportBootstrapLog(commandIn)
switch {
// 0<100 error cases??
case statusCode == 0:
return fmt.Errorf("communication error %v %v", statusCode, err)
case statusCode >= 500:
return fmt.Errorf("server error %v %v", statusCode, err)
case statusCode >= 400:
return fmt.Errorf("client error %v %v", statusCode, err)
default:
return nil
}
})

// Removes temp file
defer func() {
err := os.Remove(cmdFileName)
if err != nil {
log.Warn("Temp file cannot be removed", err.Error())
return fmt.Errorf("cannot send the chunk data, %v", err)
}
}()

// Gets the pipe command
stdout, err := newCmd.StdoutPipe()
if err != nil {
return fmt.Errorf("cannot get pipe command %v", err)
return nil
}
log.Info("==> Executing: ", strings.Join(newCmd.Args, " "))

// Start command asynchronously
if err = newCmd.Start(); err != nil {
return fmt.Errorf("cannot start the specified command %v", err)
}

chunk := ""
nLines, nTime, nBytes := 0, 0, 0
timeStart := time.Now()

scanner := bufio.NewScanner(bufio.NewReader(stdout))
for scanner.Scan() {
chunk = strings.Join([]string{chunk, scanner.Text(), "\n"}, "")
nLines++
nTime = int(time.Now().Sub(timeStart).Seconds())
nBytes = len(chunk)
if nLines >= threshold.lines || nTime >= threshold.time || nBytes >= threshold.bytes {
if err := sendChunks(pollingSvc, chunk); err != nil {
nLines, nTime = 0, 0
timeStart = time.Now()
} else {
chunk = ""
nLines, nTime, nBytes = 0, 0, 0
timeStart = time.Now()
}
}
}

if err := scanner.Err(); err != nil {
log.Error("==> Error: ", err.Error())
chunk = strings.Join([]string{chunk, err.Error()}, "")
if err := utils.RunContinuousReportRun(fn, cmdArg, thresholds); err != nil {
formatter.PrintFatal("cannot process continuous report command", err)
}

if len(chunk) > 0 {
log.Debug("Sending the last pending chunk")
if err := sendChunks(pollingSvc, chunk); err != nil {
return err
}
}
log.Info("completed")
return nil
}

Expand All @@ -163,35 +90,3 @@ func retry(attempts int, sleep time.Duration, fn func() error) error {
}
return nil
}

func sendChunks(pollingSvc *polling.PollingService, chunk string) error {
log.Debug("sendChunks")

err := retry(RetriesNumber, time.Second, func() error {
log.Debug("Sending: ", chunk)
command := types.PollingContinuousReport{
Stdout: chunk,
}
commandIn, err := utils.ItemConvertParamsWithTagAsID(command)
if err != nil {
return fmt.Errorf("error parsing payload %v", err)
}

_, statusCode, err := pollingSvc.ReportBootstrapLog(commandIn)
switch {
case statusCode == 0:
return fmt.Errorf("communication error %v %v", statusCode, err)
case statusCode >= 500:
return fmt.Errorf("server error %v %v", statusCode, err)
case statusCode >= 400:
return fmt.Errorf("client error %v %v", statusCode, err)
default:
return nil
}
})

if err != nil {
return fmt.Errorf("cannot send the chunk data, %v", err)
}
return nil
}
7 changes: 4 additions & 3 deletions cmdpolling/subcommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmdpolling

import (
"github.com/codegangsta/cli"
"github.com/ingrammicro/concerto/utils"
)

func SubCommands() []cli.Command {
Expand Down Expand Up @@ -37,17 +38,17 @@ func SubCommands() []cli.Command {
cli.IntFlag{
Name: "lines, l",
Usage: "Maximum lines threshold per response chunk",
Value: DefaultThresholdLines,
Value: utils.DefaultThresholdLines,
},
cli.IntFlag{
Name: "time, t",
Usage: "Maximum time -seconds- threshold per response chunk",
Value: DefaultThresholdTime,
Value: utils.DefaultThresholdTime,
},
cli.IntFlag{
Name: "bytes, b",
Usage: "Maximum bytes threshold per response chunk",
Value: DefaultThresholdBytes,
Value: utils.DefaultThresholdBytes,
},
},
},
Expand Down
106 changes: 104 additions & 2 deletions utils/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,32 @@ import (
"bufio"
"bytes"
"fmt"
log "github.com/Sirupsen/logrus"
"io"
"io/ioutil"
"os"
"os/exec"
"runtime"
"strings"
"syscall"
"time"

log "github.com/Sirupsen/logrus"
)

const (
TimeStampLayout = "2006-01-02T15:04:05.000000-07:00"
TimeStampLayout = "2006-01-02T15:04:05.000000-07:00"
TimeLayoutYYYYMMDDHHMMSS = "20060102150405"
DefaultThresholdLines = 10
DefaultThresholdTime = 10
DefaultThresholdBytes = 500
)

type Thresholds struct {
Lines int
Time int
Bytes int
}

func extractExitCode(err error) int {
if err != nil {
switch err.(type) {
Expand Down Expand Up @@ -135,3 +147,93 @@ func RunCmd(command string) (output string, exitCode int, startedAt time.Time, f
log.Infof("Exit Code: %d", exitCode)
return
}

func RunContinuousReportRun(fn func(chunk string) error, cmdArg string, thresholds Thresholds) error {
log.Debug("RunContinuousReportRun")

// command thresholds flags
if !(thresholds.Lines > 0) {
thresholds.Lines = DefaultThresholdLines
}
if !(thresholds.Time > 0) {
thresholds.Time = DefaultThresholdTime
}
if !(thresholds.Bytes > 0) {
thresholds.Bytes = DefaultThresholdBytes
}
log.Debug("Threshold", thresholds)

// Saves script/command in a temp file
cmdFileName := strings.Join([]string{time.Now().Format(TimeLayoutYYYYMMDDHHMMSS), "_", RandomString(10)}, "")
if runtime.GOOS == "windows" {
cmdFileName = strings.Join([]string{cmdFileName, ".bat"}, "")
}

// Writes content
if err := ioutil.WriteFile(cmdFileName, []byte(cmdArg), 0600); err != nil {
log.Fatalf("Error creating temp file : ", err)
}

// Creates command
var newCmd *exec.Cmd
if runtime.GOOS == "windows" {
newCmd = exec.Command("cmd", "/C", cmdFileName)
} else {
newCmd = exec.Command("/bin/sh", cmdFileName)
}

// Removes temp file
defer func() {
err := os.Remove(cmdFileName)
if err != nil {
log.Warn("Temp file cannot be removed", err.Error())
}
}()

// Gets the pipe command
stdout, err := newCmd.StdoutPipe()
if err != nil {
return fmt.Errorf("cannot get pipe command %v", err)
}
log.Info("==> Executing: ", strings.Join(newCmd.Args, " "))

// Start command asynchronously
if err = newCmd.Start(); err != nil {
return fmt.Errorf("cannot start the specified command %v", err)
}

chunk := ""
nLines, nTime, nBytes := 0, 0, 0
timeStart := time.Now()

scanner := bufio.NewScanner(bufio.NewReader(stdout))
for scanner.Scan() {
chunk = strings.Join([]string{chunk, scanner.Text(), "\n"}, "")
nLines++
nTime = int(time.Now().Sub(timeStart).Seconds())
nBytes = len(chunk)
if nLines >= thresholds.Lines || nTime >= thresholds.Time || nBytes >= thresholds.Bytes {
if err := fn(chunk); err != nil {
nLines, nTime = 0, 0
timeStart = time.Now()
} else {
chunk = ""
nLines, nTime, nBytes = 0, 0, 0
timeStart = time.Now()
}
}
}

if err := scanner.Err(); err != nil {
log.Error("==> Error: ", err.Error())
chunk = strings.Join([]string{chunk, err.Error()}, "")
}

if len(chunk) > 0 {
log.Debug("Sending the last pending chunk")
if err := fn(chunk); err != nil {
return err
}
}
return nil
}

0 comments on commit ee9daa4

Please sign in to comment.