Skip to content

Commit

Permalink
dag: initial color support
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidGamba committed Oct 11, 2024
1 parent d527aa6 commit 0d2159a
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 19 deletions.
4 changes: 3 additions & 1 deletion changelog.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ Fix typo in dag library: TaskDependensOn -> TaskDependsOn

=== New Features

Add task retries in dag library.
* Add task retries in dag library.

* Add logging color support in dag library.

== v0.30.0: New Features

Expand Down
34 changes: 34 additions & 0 deletions dag/color.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package dag

import (
"fmt"
"strings"
)

func (g *Graph) colorInfo(format string) string {
return g.color(g.InfoColor, format)
}

func (g *Graph) colorInfoBold(format string) string {
return g.color(g.InfoBoldColor, format)
}

func (g *Graph) colorError(format string) string {
return g.color(g.ErrorColor, format)
}

func (g *Graph) colorErrorBold(format string) string {
return g.color(g.ErrorBoldColor, format)
}

func (g *Graph) color(color string, format string) string {
if !g.UseColor {
return format
}
if !strings.HasSuffix(format, "\n") {
return fmt.Sprintf("\033[%sm%s\033[0m", color, format)
}

format = format[:len(format)-1]
return fmt.Sprintf("\033[%sm%s\033[0m\n", color, format)
}
89 changes: 89 additions & 0 deletions dag/color_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package dag

import (
"bytes"
"context"
"errors"
"fmt"
"log"
"sync"
"testing"
"time"

"github.com/DavidGamba/go-getoptions"
)

func setupLoggingWithoutTime() *bytes.Buffer {
s := ""
buf := bytes.NewBufferString(s)
Logger.SetFlags(log.Flags() &^ (log.Ldate | log.Ltime))
Logger.SetOutput(buf)
return buf
}

func TestColor(t *testing.T) {
buf := setupLoggingWithoutTime()
t.Cleanup(func() { t.Log(buf.String()) })

var err error

sm := sync.Mutex{}
results := []int{}
generateFn := func(n int) getoptions.CommandFn {
return func(ctx context.Context, opt *getoptions.GetOpt, args []string) error {
time.Sleep(30 * time.Millisecond)
if n == 2 {
return fmt.Errorf("failure reason")
}
sm.Lock()
results = append(results, n)
sm.Unlock()
return nil
}
}

tm := NewTaskMap()
tm.Add("t1", generateFn(1))
tm.Add("t2", generateFn(2))
tm.Add("t3", generateFn(3))

g := NewGraph("test graph").SetSerial()
g.UseColor = true
g.TaskDependsOn(tm.Get("t1"), tm.Get("t2"), tm.Get("t3"))
g.TaskDependsOn(tm.Get("t2"), tm.Get("t3"))

// Validate before running
err = g.Validate(tm)
if err != nil {
t.Errorf("Unexpected error: %s\n", err)
}

err = g.Run(context.Background(), nil, nil)
var errs *Errors
if err == nil || !errors.As(err, &errs) {
t.Fatalf("Unexpected error: %s\n", err)
}
if len(errs.Errors) != 2 {
t.Fatalf("Unexpected error size, %d: %s\n", len(errs.Errors), err)
}
if errs.Errors[0].Error() != "Task test graph:t2 error: failure reason" {
t.Fatalf("Unexpected error: %s\n", errs.Errors[0])
}
if !errors.Is(errs.Errors[1], ErrorTaskSkipped) {
t.Fatalf("Unexpected error: %s\n", errs.Errors[1])
}
if len(results) != 1 || results[0] != 3 {
t.Errorf("Wrong list: %v, len: %d, 0: %d\n", results, len(results), results[0])
}

expected := "\033[34mRunning Task \033[0m\033[36;1mtest graph:t3\033[0m\n" +
"\033[34mCompleted Task \033[0m\033[36;1mtest graph:t3\033[0m\033[34m in 00m:00s\033[0m\n" +
"\033[34mRunning Task \033[0m\033[36;1mtest graph:t2\033[0m\n" +
"\033[34mCompleted Task \033[0m\033[36;1mtest graph:t2\033[0m\033[34m in 00m:00s\033[0m\n" +
"\033[31mTask \033[0m\033[35;1mtest graph:t2\033[0m\033[31m error: failure reason\033[0m\n" +
"\033[31mTask \033[0m\033[35;1mtest graph:t1\033[0m\033[31m error: skipped\033[0m\n" +
"\033[34mCompleted \033[0m\033[36;1mtest graph\033[0m\033[34m Run in 00m:00s\033[0m\n"
if buf.String() != expected {
t.Errorf("Wrong output:\n'%s'\nexpected:\n'%s'\n", buf.String(), expected)
}
}
30 changes: 19 additions & 11 deletions dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ type (
bufferOutput bool
bufferWriter io.Writer
bufferMutex sync.Mutex
UseColor bool
InfoColor string
InfoBoldColor string
ErrorColor string
ErrorBoldColor string
}

runStatus int
Expand Down Expand Up @@ -196,6 +201,10 @@ func NewGraph(name string) *Graph {
Vertices: make(map[ID]*Vertex),
errs: &Errors{name, make([]error, 0)},
maxParallel: 1_000_000,
InfoColor: "34",
InfoBoldColor: "36;1",
ErrorColor: "31",
ErrorBoldColor: "35;1",
}
}

Expand Down Expand Up @@ -424,7 +433,7 @@ LOOP:
g.Vertices[iderr.ID].status = runDone
if iderr.Error != nil {
err := fmt.Errorf("Task %s:%s error: %w", g.Name, iderr.ID, iderr.Error)
Logger.Printf("%s\n", err)
Logger.Printf(g.colorError("Task ")+g.colorErrorBold("%s:%s")+g.colorError(" error: %s\n"), g.Name, iderr.ID, iderr.Error)
if !errors.Is(iderr.Error, ErrorSkipParents) {
g.errs.Errors = append(g.errs.Errors, err)
continue
Expand All @@ -443,8 +452,8 @@ LOOP:
if handledContext {
break
}
Logger.Printf("Cancelation received or time out reached, allowing in-progress tasks to finish, skipping the rest.\n")
g.errs.Errors = append(g.errs.Errors, fmt.Errorf("cancelation received or time out reached"))
Logger.Print(g.colorError("Cancellation received or time out reached, allowing in-progress tasks to finish, skipping the rest.\n"))
g.errs.Errors = append(g.errs.Errors, fmt.Errorf("cancellation received or time out reached"))
handledContext = true
default:
break
Expand All @@ -456,7 +465,7 @@ LOOP:
}
if v.status == runSkip {
v.status = runInProgress
Logger.Printf("Skipped Task %s:%s\n", g.Name, v.ID)
Logger.Printf(g.colorError("Skipped Task ")+g.colorErrorBold("%s:%s\n"), g.Name, v.ID)
go func(done chan IDErr, v *Vertex) {
done <- IDErr{v.ID, nil}
}(done, v)
Expand All @@ -472,7 +481,7 @@ LOOP:
go func(ctx context.Context, done chan IDErr, v *Vertex) {
semaphore <- struct{}{}
defer func() { <-semaphore }()
Logger.Printf("Running Task %s:%s\n", g.Name, v.ID)
Logger.Printf(g.colorInfo("Running Task ")+g.colorInfoBold("%s:%s\n"), g.Name, v.ID)
start := time.Now()
v.Task.Lock()
defer v.Task.Unlock()
Expand All @@ -492,21 +501,20 @@ LOOP:
_, _ = combinedBuffer.WriteTo(g.bufferWriter)
g.bufferMutex.Unlock()
}
Logger.Printf("Completed Task %s:%s in %s\n", g.Name, v.ID, durationStr(time.Since(start)))
Logger.Printf(g.colorInfo("Completed Task ")+g.colorInfoBold("%s:%s")+g.colorInfo(" in %s\n"), g.Name, v.ID, durationStr(time.Since(start)))
if err == nil {
break
}
if err != nil && i < v.Retries {
err = fmt.Errorf("Task %s:%s error: %w", g.Name, v.ID, err)
Logger.Printf("%s", err)
Logger.Printf("Retrying (%d/%d) Task %s:%s\n", i+1, v.Retries, g.Name, v.ID)
Logger.Printf(g.colorError("Task ")+g.colorErrorBold("%s:%s")+g.colorError(" error: %s"), g.Name, v.ID, err)
Logger.Printf(g.colorInfo("Retrying (%d/%d) Task %s:%s\n"), i+1, v.Retries, g.Name, v.ID)
}
}
done <- IDErr{v.ID, err}
}(ctx, done, v)
}
}
Logger.Printf("Completed %s Run in %s\n", g.Name, durationStr(time.Since(runStart)))
Logger.Printf(g.colorInfo("Completed ")+g.colorInfoBold("%s")+g.colorInfo(" Run in %s\n"), g.Name, durationStr(time.Since(runStart)))

if len(g.errs.Errors) != 0 {
return g.errs
Expand Down Expand Up @@ -569,7 +577,7 @@ func (g *Graph) getNextVertex() (*Vertex, bool, bool) {

// skipParents - Marks all Vertex parents as runDone
func skipParents(v *Vertex) {
Logger.Printf("skip parents for %s\n", v.ID)
// Logger.Printf("skip parents for %s\n", v.ID)
for _, c := range v.Parents {
c.status = runSkip
skipParents(c)
Expand Down
14 changes: 7 additions & 7 deletions dag/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ func TestDagTaskErrorRetry(t *testing.T) {
}
}

func TestDagContexDone(t *testing.T) {
func TestDagContextDone(t *testing.T) {
buf := setupLogging()
t.Cleanup(func() { t.Log(buf.String()) })

Expand Down Expand Up @@ -526,20 +526,20 @@ func TestDagContexDone(t *testing.T) {
if len(errs.Errors) != 5 {
t.Fatalf("Unexpected error size, %d: %s\n", len(errs.Errors), err)
}
if errs.Errors[0].Error() != "cancelation received or time out reached" {
t.Fatalf("Unexpected error: %s\n", errs.Errors[0])
if errs.Errors[0].Error() != "cancellation received or time out reached" {
t.Fatalf("Unexpected error 0: %s\n", errs.Errors[0])
}
if !errors.Is(errs.Errors[1], ErrorTaskSkipped) {
t.Fatalf("Unexpected error: %s\n", errs.Errors[1])
t.Fatalf("Unexpected error 1: %s\n", errs.Errors[1])
}
if !errors.Is(errs.Errors[2], ErrorTaskSkipped) {
t.Fatalf("Unexpected error: %s\n", errs.Errors[1])
t.Fatalf("Unexpected error 2: %s\n", errs.Errors[2])
}
if !errors.Is(errs.Errors[3], ErrorTaskSkipped) {
t.Fatalf("Unexpected error: %s\n", errs.Errors[1])
t.Fatalf("Unexpected error 3: %s\n", errs.Errors[3])
}
if !errors.Is(errs.Errors[4], ErrorTaskSkipped) {
t.Fatalf("Unexpected error: %s\n", errs.Errors[1])
t.Fatalf("Unexpected error 4: %s\n", errs.Errors[4])
}
if len(results) > 4 {
t.Errorf("Wrong list: %v\n", results)
Expand Down

0 comments on commit 0d2159a

Please sign in to comment.