-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathmerge.go
123 lines (110 loc) · 3.6 KB
/
merge.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package main
import (
"bufio"
"fmt"
"io"
"os"
"os/exec"
"regexp"
"strings"
"sync"
)
const COLOR_FORMAT string = "\033[3%dm%s\033[0m"
const NUM_COLORS = 6
type message struct {
content string
sender int
destination *os.File
}
func (output message) String() string {
if isTerminal(output.destination) {
return fmt.Sprintf(COLOR_FORMAT, (output.sender%NUM_COLORS)+1, output.content)
}
return output.content
}
var isTerminalCache = make(map[*os.File]bool) // Avoid making a Stat syscall for every message
func isTerminal(file *os.File) bool {
if fileIsTerminal, ok := isTerminalCache[file]; ok {
return fileIsTerminal
}
info, err := file.Stat()
isTerminalCache[file] = (err == nil) && (info.Mode()&os.ModeCharDevice != 0)
return isTerminalCache[file]
}
var TRIM_COLORS_PATTERN *regexp.Regexp = regexp.MustCompile("\033\\[[^m]*m")
func readPipe(pipe io.ReadCloser, destination *os.File, id int, mergedOutput chan message, wait *sync.WaitGroup) {
defer wait.Done()
output := bufio.NewScanner(pipe)
for output.Scan() {
mergedOutput <- message{TRIM_COLORS_PATTERN.ReplaceAllString(output.Text(), ""), id, destination}
}
}
func identifier(command *exec.Cmd) string {
fullId := strings.Join(command.Args, " ")
if len(fullId) <= 60 {
return fullId
}
return fullId[:60] + "..."
}
// Sends both stdout and stderror of command to mergedOutput
func mergeOutErr(command *exec.Cmd, id int, mergedOutput chan message) {
stdout, err := command.StdoutPipe()
if err != nil {
fatal(fmt.Sprintf("Could not connect to stdout of process '%s'. Details: %s", identifier(command), err))
}
stderror, err := command.StderrPipe()
if err != nil {
fatal(fmt.Sprintf("Could not connect to stderr of process '%s'. Details: %s", identifier(command), err))
}
if err := command.Start(); err != nil {
fatal(fmt.Sprintf("Could not start process '%s'. Details: %s", identifier(command), err))
} else {
mergedOutput <- message{fmt.Sprintf("Started '%s' (%d)", identifier(command), command.Process.Pid), id, os.Stdout}
}
var both sync.WaitGroup
both.Add(2)
go readPipe(stdout, os.Stdout, id, mergedOutput, &both)
go readPipe(stderror, os.Stderr, id, mergedOutput, &both)
both.Wait()
close(mergedOutput)
}
func listenTo(command *exec.Cmd, id int, messageBuffer chan message, processWait *sync.WaitGroup) {
defer processWait.Done()
mergedOutput := make(chan message)
go mergeOutErr(command, id, mergedOutput)
for message := range mergedOutput {
messageBuffer <- message
}
if err, ok := (command.Wait()).(*exec.ExitError); err != nil {
if ok {
messageBuffer <- message{fmt.Sprintf("Process '%s' (%d) exited with status %d", identifier(command), command.Process.Pid, err.ExitCode()), id, os.Stdout}
} else {
messageBuffer <- message{fmt.Sprintf("Process '%s' (%d) exited abnormally. Details: %s", identifier(command), command.Process.Pid, err), id, os.Stdout}
}
} else {
messageBuffer <- message{fmt.Sprintf("Process '%s' (%d) exited successfully", identifier(command), command.Process.Pid), id, os.Stdout}
}
}
func fatal(errorMessage string) {
fmt.Fprintln(os.Stderr, fmt.Sprintf("Error: %s", errorMessage))
os.Exit(1)
}
func main() {
if len(os.Args) < 3 {
fatal("Must supply at least two processes to run")
}
var processWait sync.WaitGroup
processWait.Add(len(os.Args) - 1)
messageBuffer := make(chan message)
for i, cmd := range os.Args[1:] {
fields := strings.Fields(cmd)
go listenTo(exec.Command(fields[0], fields[1:]...), i, messageBuffer, &processWait)
}
go (func() {
processWait.Wait()
close(messageBuffer)
})()
for output := range messageBuffer {
fmt.Fprintln(output.destination, output)
}
}