Skip to content

Commit

Permalink
fix buffer race issues
Browse files Browse the repository at this point in the history
  • Loading branch information
sc-zenokerr committed Jun 18, 2024
1 parent 6b9a28c commit e38b16c
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 20 deletions.
61 changes: 45 additions & 16 deletions cmd/acadock-monitoring/isgraceful/isgraceful.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package isgraceful
import (
"bytes"
"errors"
"io"
"os"
"os/exec"
"strconv"
"strings"
"sync"
"syscall"
"testing"
"time"
Expand All @@ -15,10 +17,14 @@ import (
)

type CmdAndOutput struct {
t *testing.T
Cmd *exec.Cmd
Output *bytes.Buffer
pid int
t *testing.T
Cmd *exec.Cmd
pid int

output *bytes.Buffer
outputMu sync.Mutex
oldStdout io.Writer
oldStderr io.Writer

// shutdownWaitDuration is the duration which is waited for all connections to stop
shutdownWaitDuration time.Duration
Expand All @@ -39,7 +45,7 @@ func NewCmdAndOutput(t *testing.T, options ...func(*CmdAndOutput)) *CmdAndOutput
t.Helper()
c := &CmdAndOutput{
t: t,
Output: new(bytes.Buffer),
output: new(bytes.Buffer),
startWaitDuration: 100 * time.Millisecond,
upgradeWaitDuration: 30 * time.Second,
shutdownWaitDuration: 60 * time.Second,
Expand All @@ -57,10 +63,10 @@ func WithCmd(cmd *exec.Cmd) func(*CmdAndOutput) {
}
}

// WithOutput sets the Output field of the CmdAndOutput struct
// WithOutput sets the output field of the CmdAndOutput struct
func WithOutput(stdout *bytes.Buffer) func(*CmdAndOutput) {
return func(c *CmdAndOutput) {
c.Output = stdout
c.output = stdout
}
}

Expand Down Expand Up @@ -96,11 +102,6 @@ func WithShutdownWaitDuration(duration time.Duration) func(output *CmdAndOutput)
func (c *CmdAndOutput) Signal(signal os.Signal) {
c.t.Helper()

// get pid from pid file
if c.pidFile != "" {
c.pid = c.readPidFile()
}

err := c.findProcess().Signal(signal)
if err != nil {
c.t.Fatalf("send signal %v: %v", signal, err)
Expand All @@ -110,8 +111,29 @@ func (c *CmdAndOutput) Signal(signal os.Signal) {
// Start starts the process
func (c *CmdAndOutput) Start() {
c.t.Helper()
c.Cmd.Stdout = c.Output
c.Cmd.Stderr = c.Output

c.oldStdout = c.Cmd.Stdout
c.oldStderr = c.Cmd.Stderr
r, w, _ := os.Pipe()
c.Cmd.Stdout = w
c.Cmd.Stderr = w

// Read from pipe and append to buffer with locking
go func() {
b := make([]byte, 1024)
for {
n, err := r.Read(b)
if n > 0 {
c.outputMu.Lock()
c.output.Write(b[:n])
c.outputMu.Unlock()
}
if err != nil {
break
}
}
}()

err := c.Cmd.Start()
if err != nil {
c.t.Fatalf("failed to start process: %v", err)
Expand Down Expand Up @@ -152,7 +174,7 @@ func (c *CmdAndOutput) IsStoppedAfter(timeout time.Duration) {
time.Sleep(timeout)

// Nil if the process running
require.NotNilf(c.t, c.findProcess(), "process %v was up after %v, output: \n\n%v", c.Cmd, timeout, c.Output.String())
require.NotNilf(c.t, c.findProcess(), "process %v was up after %v", c.Cmd, timeout)
}

// IsRunningAfter checks if the process is running after a certain duration
Expand All @@ -171,7 +193,14 @@ func (c *CmdAndOutput) IsRunningAfter(timeout time.Duration) {

// Nil if the process running
require.NoErrorf(c.t, c.findProcess().Signal(syscall.Signal(0)),
"process %v is dead after %v, status: %v, output: \n\n%v", c.Cmd.Args, timeout, processState, c.Output.String())
"process %v is dead after %v, status: %v", c.Cmd.Args, timeout, processState)
}

// GetOutput returns the output of the process
func (c *CmdAndOutput) GetOutput() string {
c.outputMu.Lock()
defer c.outputMu.Unlock()
return c.output.String()
}

func (c *CmdAndOutput) readPidFile() int {
Expand Down
8 changes: 4 additions & 4 deletions cmd/acadock-monitoring/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestService_Shutdown(t *testing.T) {
isGraceful.IsStoppedAfter(shutdownTimeout)

// Check the output
output := isGraceful.Output.String()
output := isGraceful.GetOutput()
t.Log(output)
require.Equal(t, 1, strings.Count(output, "parent exited"))
require.Zero(t, strings.Count(output, "upgrade requested"))
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestService_Restart(t *testing.T) {
isGraceful.IsRunningAfter(upgradeTimeout)

// Check the output
output := isGraceful.Output.String()
output := isGraceful.GetOutput()
t.Log(output)
require.Equal(t, 1, strings.Count(output, "upgrade requested"))
require.Zero(t, strings.Count(output, "upgrade failed"))
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestService_Restart_Twice(t *testing.T) {
isGraceful.IsRunningAfter(upgradeTimeout)

// Check the output
output := isGraceful.Output.String()
output := isGraceful.GetOutput()
t.Log(output)
require.Equal(t, 1, strings.Count(output, "upgrade requested"))
require.Zero(t, strings.Count(output, "upgrade failed"))
Expand All @@ -153,7 +153,7 @@ func TestService_Restart_Twice(t *testing.T) {
isGraceful.IsRunningAfter(upgradeTimeout)

// Check the output
output = isGraceful.Output.String()
output = isGraceful.GetOutput()
t.Log(output)
require.Equal(t, 2, strings.Count(output, "upgrade requested"))
require.Zero(t, strings.Count(output, "upgrade failed"))
Expand Down

0 comments on commit e38b16c

Please sign in to comment.