Skip to content

Commit

Permalink
use stdout/stderr pipe to send chan data logs
Browse files Browse the repository at this point in the history
  • Loading branch information
Son Roy Almerol committed Dec 16, 2024
1 parent eb7fa9f commit d2e15af
Showing 1 changed file with 36 additions and 23 deletions.
59 changes: 36 additions & 23 deletions internal/backend/backup/jobrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package backup

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -65,6 +64,8 @@ func RunBackup(job *store.Job, storeInstance *store.Store, waitChan chan struct{
srcPath = filepath.Join(srcPath, job.Subpath)

taskChan := make(chan store.Task)
logChan := make(chan string, 100)

watchCtx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -143,11 +144,27 @@ func RunBackup(job *store.Job, storeInstance *store.Store, waitChan chan struct{
}
}

logBuffer := bytes.Buffer{}
writer := io.MultiWriter(os.Stdout, &logBuffer)
cmdStdout, err := cmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("RunBackup: error creating stdout pipe -> %w", err)
}
cmdStderr, err := cmd.StderrPipe()
if err != nil {
return nil, fmt.Errorf("RunBackup: error creating stderr pipe -> %w", err)
}

go func() {
defer close(logChan)
readers := io.MultiReader(cmdStdout, cmdStderr)
scanner := bufio.NewScanner(readers)
for scanner.Scan() {
logChan <- scanner.Text()
}
if err := scanner.Err(); err != nil {
log.Printf("Log reader error: %v", err)
}
}()

cmd.Stdout = writer
cmd.Stderr = writer
err = cmd.Start()
if err != nil {
if agentMount != nil {
Expand Down Expand Up @@ -190,8 +207,6 @@ func RunBackup(job *store.Job, storeInstance *store.Store, waitChan chan struct{
}
defer logFile.Close()
writer := bufio.NewWriter(logFile)
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

taskHasError := false
for {
Expand All @@ -208,23 +223,21 @@ func RunBackup(job *store.Job, storeInstance *store.Store, waitChan chan struct{
}
writer.Flush()
return
case <-ticker.C:
lines := strings.Split(logBuffer.String(), "\n")
for _, line := range lines {
if line != "" {
if strings.Contains(line, "Error: upload failed:") {
line = strings.Replace(line, "Error:", "TASK ERROR:", 1)
taskHasError = true
}

_, err := writer.WriteString(formattedTime + ": " + line + "\n")
if err != nil {
log.Printf("Failed to write logs for task %s: %v", task.UPID, err)
return
}
}
case logLine, ok := <-logChan:
if !ok {
continue
}

if strings.Contains(logLine, "Error: upload failed:") {
logLine = strings.Replace(logLine, "Error:", "TASK ERROR:", 1)
taskHasError = true
}

_, err := writer.WriteString(formattedTime + ": " + logLine + "\n")
if err != nil {
log.Printf("Failed to write logs for task %s: %v", task.UPID, err)
return
}
logBuffer.Reset()
writer.Flush()
}
}
Expand Down

0 comments on commit d2e15af

Please sign in to comment.