Skip to content

Commit

Permalink
refactor(spawn): improve process creation and error handling
Browse files Browse the repository at this point in the history
- Replace direct process creation with exec.Command for better control
- Simplify pipe creation and handling for stdout and stderr
- Remove redundant code and consolidate error checking
- Update file operations to use os.ReadFile instead of deprecated ioutil
- Remove unused celery-worker configuration from ecosystem.json
  • Loading branch information
dunstorm committed Aug 30, 2024
1 parent 312ae5e commit e045428
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 106 deletions.
4 changes: 2 additions & 2 deletions app/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package app

import (
"encoding/json"
"io/ioutil"
"os"

pb "github.com/dunstorm/pm2-go/proto"
"github.com/dunstorm/pm2-go/shared"
Expand All @@ -20,7 +20,7 @@ type Data struct {

func readFileJson(filePath string) ([]Data, error) {
// read file
content, err := ioutil.ReadFile(filePath)
content, err := os.ReadFile(filePath)
if err != nil {
return nil, err
}
Expand Down
8 changes: 0 additions & 8 deletions examples/ecosystem.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,5 @@
"scripts": ["logs_date"],
"executable_path": "python3",
"cron_restart": "* * * * *"
},
{
"name": "celery-worker",
"args": ["worker"],
"autorestart": false,
"cwd": "./examples",
"scripts": ["logs_date"],
"executable_path": "celery"
}
]
150 changes: 54 additions & 96 deletions shared/spawn.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ func (params *SpawnParams) createFiles() error {
}

func (params *SpawnParams) checkScripts() {
// check if we have a script to run
for _, script := range params.Scripts {
scriptPath := path.Join(utils.GetMainDirectory(), "scripts", script+".sh")
if _, err := os.Stat(scriptPath); os.IsNotExist(err) {
Expand All @@ -82,11 +81,9 @@ func (params *SpawnParams) checkScripts() {
}
}

// pipe each script to the process
func createPipedProcesses(params *SpawnParams, stdoutLogsRead *os.File, stderrLogsRead *os.File, stdoutLogsWrite *os.File, stderrLogsWrite *os.File) error {
var err error
var newStdoutLogsRead *os.File
var newErrorLogsRead *os.File
var newStdoutLogsRead, newErrorLogsRead *os.File
for index, script := range params.Scripts {
scriptPath := path.Join(utils.GetMainDirectory(), "scripts", script+".sh")
if index == len(params.Scripts)-1 {
Expand All @@ -104,39 +101,22 @@ func createPipedProcesses(params *SpawnParams, stdoutLogsRead *os.File, stderrLo
return err
}
}
// start stdout piped process
_, err := os.StartProcess("/bin/sh", []string{"/bin/sh", scriptPath}, &os.ProcAttr{
Dir: params.Cwd,
Env: os.Environ(),
Files: []*os.File{
stdoutLogsRead,
stdoutLogsWrite,
params.nullFile,
},
Sys: &syscall.SysProcAttr{
Foreground: false,
},
})
if err != nil {
params.Logger.Fatal().Msg(err.Error())
return err
}
// start stderr piped process
_, err = os.StartProcess("/bin/sh", []string{"/bin/sh", scriptPath}, &os.ProcAttr{
Dir: params.Cwd,
Env: os.Environ(),
Files: []*os.File{
stderrLogsRead,
stderrLogsWrite,
params.nullFile,
},
Sys: &syscall.SysProcAttr{
Foreground: false,
},
})
if err != nil {
params.Logger.Fatal().Msg(err.Error())
return err
for _, stream := range []struct {
read, write *os.File
}{
{stdoutLogsRead, stdoutLogsWrite},
{stderrLogsRead, stderrLogsWrite},
} {
cmd := exec.Command("/bin/sh", scriptPath)
cmd.Dir = params.Cwd
cmd.Env = os.Environ()
cmd.Stdin = stream.read
cmd.Stdout = stream.write
cmd.Stderr = params.nullFile
if err := cmd.Start(); err != nil {
params.Logger.Fatal().Msg(err.Error())
return err
}
}
if newStdoutLogsRead != nil {
stdoutLogsRead = newStdoutLogsRead
Expand All @@ -147,9 +127,7 @@ func createPipedProcesses(params *SpawnParams, stdoutLogsRead *os.File, stderrLo
}

func SpawnNewProcess(params SpawnParams) (*pb.Process, error) {
// validate params
err := params.fillDefaults()
if err != nil {
if err := params.fillDefaults(); err != nil {
return nil, err
}

Expand All @@ -158,91 +136,78 @@ func SpawnNewProcess(params SpawnParams) (*pb.Process, error) {
params.Logger.Warn().Msg("Add -u flag to prevent output buffering on python")
}

// create files
err = params.createFiles()
if err != nil {
if err := params.createFiles(); err != nil {
return nil, err
}

var err error
params.ExecutablePath, err = exec.LookPath(params.ExecutablePath)
if err != nil {
return nil, err
}

var stdoutLogsWrite *os.File
var stdoutLogsRead *os.File

var stderrLogsWrite *os.File
var stderrLogsRead *os.File
var stdoutLogsWrite, stdoutLogsRead, stderrLogsWrite, stderrLogsRead *os.File

if len(params.Scripts) == 0 {
stdoutLogsWrite = params.logFile
stderrLogsWrite = params.errFile
} else {
// create initial stdout pipe
stdoutLogsRead, stdoutLogsWrite, err = os.Pipe()
if err != nil {
return nil, err
}

// create initial err pipe
stderrLogsRead, stderrLogsWrite, err = os.Pipe()
if err != nil {
return nil, err
}

// check if scripts exist
params.checkScripts()
}

defer stdoutLogsRead.Close()
defer stdoutLogsWrite.Close()

// create process
var attr = os.ProcAttr{
Dir: params.Cwd,
Env: os.Environ(),
Files: []*os.File{
params.nullFile,
stdoutLogsWrite,
stderrLogsWrite,
},
Sys: &syscall.SysProcAttr{
Foreground: false,
},
}

defer params.nullFile.Close()
defer params.logFile.Close()
defer params.errFile.Close()

fullCommand := []string{params.ExecutablePath}
fullCommand = append(fullCommand, params.Args...)

process, err := os.StartProcess(params.ExecutablePath, fullCommand, &attr)
if err != nil {
defer func() {
if stdoutLogsRead != nil {
stdoutLogsRead.Close()
}
if stdoutLogsWrite != nil {
stdoutLogsWrite.Close()
}
params.nullFile.Close()
params.logFile.Close()
params.errFile.Close()
}()

cmd := exec.Command(params.ExecutablePath, params.Args...)
cmd.Dir = params.Cwd
cmd.Env = os.Environ()
cmd.Stdin = params.nullFile
cmd.Stdout = stdoutLogsWrite
cmd.Stderr = stderrLogsWrite
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
}

if err := cmd.Start(); err != nil {
return nil, err
}

err = createPipedProcesses(&params, stdoutLogsRead, stderrLogsRead, stdoutLogsWrite, stderrLogsWrite)
if err != nil {
return nil, err
if len(params.Scripts) > 0 {
err = createPipedProcesses(&params, stdoutLogsRead, stderrLogsRead, stdoutLogsWrite, stderrLogsWrite)
if err != nil {
return nil, err
}
}

params.Logger.Info().Msgf("[%s] ✓", params.Name)

// write pid to file
err = utils.WritePidToFile(params.PidPilePath, process.Pid)
if err != nil {
if err := utils.WritePidToFile(params.PidPilePath, cmd.Process.Pid); err != nil {
params.Logger.Fatal().Msg(err.Error())
process.Kill()
cmd.Process.Kill()
return nil, err
}

rpcProcess := pb.Process{
rpcProcess := &pb.Process{
Name: params.Name,
ExecutablePath: params.ExecutablePath,
Pid: int32(process.Pid),
Pid: int32(cmd.Process.Pid),
Args: params.Args,
Cwd: params.Cwd,
Scripts: params.Scripts,
Expand All @@ -253,12 +218,5 @@ func SpawnNewProcess(params SpawnParams) (*pb.Process, error) {
CronRestart: params.CronRestart,
}

// detaches the process
err = process.Release()
if err != nil {
params.Logger.Fatal().Msg(err.Error())
return nil, err
}

return &rpcProcess, nil
return rpcProcess, nil
}

0 comments on commit e045428

Please sign in to comment.