From 8a7ea5beba891c7b4618159fd053fec43de030fe Mon Sep 17 00:00:00 2001
From: pcantera
Date: Thu, 30 Nov 2017 19:29:20 +0100
Subject: [PATCH] Adjust processing algorithm (issue #14)
- Saves the given script/command in a temp file; and run the file
- Avoid line break bytes exclusion, as well as the empty line scanned (if received)
- In error case while sending report, the process continues buffering the chunk data until next threshold
- When sending the last pending chunk and in a scanning context error; the chunk would include the error, which is concatenated to the previous chunk if available.
* At any case, the chunk has to contain bytes!
See #14
---
cmdpolling/continuousreport.go | 71 ++++++++++++++++++++++------------
utils/utils.go | 12 ++++++
2 files changed, 58 insertions(+), 25 deletions(-)
diff --git a/cmdpolling/continuousreport.go b/cmdpolling/continuousreport.go
index 93282e1..191cf85 100644
--- a/cmdpolling/continuousreport.go
+++ b/cmdpolling/continuousreport.go
@@ -4,6 +4,8 @@ import (
"bufio"
"errors"
"fmt"
+ "io/ioutil"
+ "os"
"os/exec"
"runtime"
"strings"
@@ -39,9 +41,9 @@ func cmdContinuousReportRun(c *cli.Context) error {
pollingSvc := cmd.WireUpPolling(c)
// cli command argument
- var commandArg string
+ var cmdArg string
if c.Args().Present() {
- commandArg = c.Args().First()
+ cmdArg = c.Args().First()
} else {
formatter.PrintFatal("argument missing", errors.New("a script or command is required"))
}
@@ -62,7 +64,7 @@ func cmdContinuousReportRun(c *cli.Context) error {
threshold := Threshold{lines: thresholdLines, time: thresholdTime, bytes: thresholdBytes}
log.Debug("Threshold", threshold)
- if err := processCommand(pollingSvc, commandArg, threshold); err != nil {
+ if err := processCommand(pollingSvc, cmdArg, threshold); err != nil {
formatter.PrintFatal("cannot process continuous report command", err)
}
@@ -70,16 +72,36 @@ func cmdContinuousReportRun(c *cli.Context) error {
return nil
}
-func processCommand(pollingSvc *polling.PollingService, commandArg string, threshold Threshold) error {
+func processCommand(pollingSvc *polling.PollingService, cmdArg string, threshold Threshold) error {
log.Debug("processCommand")
+ // Saves script/command in a temp file
+ cmdFileName := strings.Join([]string{time.Now().Format("20060102150405"), "_", utils.RandomString(10)}, "")
+ if runtime.GOOS == "windows" {
+ cmdFileName = strings.Join([]string{cmdFileName, ".bat"}, "")
+ }
+
+ // Writes content
+ if err := ioutil.WriteFile(cmdFileName, []byte(cmdArg), 0600); err != nil {
+ log.Fatalf("Error creating temp file : ", err)
+ }
+
+ // Creates command
var newCmd *exec.Cmd
if runtime.GOOS == "windows" {
- newCmd = exec.Command("cmd", "/C", commandArg)
+ newCmd = exec.Command("cmd", "/C", cmdFileName)
} else {
- newCmd = exec.Command("/bin/sh", "-c", commandArg)
+ newCmd = exec.Command("/bin/sh", cmdFileName)
}
+ // Removes temp file
+ defer func() {
+ err := os.Remove(cmdFileName)
+ if err != nil {
+ log.Warn("Temp file cannot be removed", err.Error())
+ }
+ }()
+
// Gets the pipe command
stdout, err := newCmd.StdoutPipe()
if err != nil {
@@ -92,37 +114,36 @@ func processCommand(pollingSvc *polling.PollingService, commandArg string, thres
return fmt.Errorf("cannot start the specified command %v", err)
}
- line := ""
+ chunk := ""
nLines, nTime, nBytes := 0, 0, 0
timeStart := time.Now()
scanner := bufio.NewScanner(bufio.NewReader(stdout))
for scanner.Scan() {
- line += scanner.Text()
- if len(line) > 0 {
- nLines++
- nTime = int(time.Now().Sub(timeStart).Seconds())
- nBytes = len(line)
- if nLines >= threshold.lines || nTime >= threshold.time || nBytes >= threshold.bytes {
- if err := sendChunks(pollingSvc, line); err != nil {
- return err
- } else {
- line = ""
- nLines, nTime, nBytes = 0, 0, 0
- timeStart = time.Now()
- }
+ chunk = strings.Join([]string{chunk, scanner.Text(), "\n"}, "")
+ nLines++
+ nTime = int(time.Now().Sub(timeStart).Seconds())
+ nBytes = len(chunk)
+ if nLines >= threshold.lines || nTime >= threshold.time || nBytes >= threshold.bytes {
+ if err := sendChunks(pollingSvc, chunk); err != nil {
+ nLines, nTime = 0, 0
+ timeStart = time.Now()
+ } else {
+ chunk = ""
+ nLines, nTime, nBytes = 0, 0, 0
+ timeStart = time.Now()
}
}
}
if err := scanner.Err(); err != nil {
log.Error("==> Error: ", err.Error())
- if err := sendChunks(pollingSvc, "error reading standard input:"+err.Error()); err != nil {
- return err
- }
- } else {
+ chunk = strings.Join([]string{chunk, err.Error()}, "")
+ }
+
+ if len(chunk) > 0 {
log.Debug("Sending the last pending chunk")
- if err := sendChunks(pollingSvc, line); err != nil {
+ if err := sendChunks(pollingSvc, chunk); err != nil {
return err
}
}
diff --git a/utils/utils.go b/utils/utils.go
index 6f61c1c..4b4ad2c 100644
--- a/utils/utils.go
+++ b/utils/utils.go
@@ -4,10 +4,12 @@ import (
"archive/zip"
"fmt"
"io"
+ "math/rand"
"os"
"path/filepath"
"regexp"
"strings"
+ "time"
"github.com/codegangsta/cli"
@@ -186,3 +188,13 @@ func CheckRequiredFlags(c *cli.Context, flags []string) {
os.Exit(2)
}
}
+
+func RandomString(strlen int) string {
+ r := rand.New(rand.NewSource(time.Now().UnixNano()))
+ const chars = "abcdefghijklmnopqrstuvwxyz0123456789"
+ result := make([]byte, strlen)
+ for i := range result {
+ result[i] = chars[r.Intn(len(chars))]
+ }
+ return string(result)
+}