diff --git a/changelog.adoc b/changelog.adoc index a236463..e0d81cb 100644 --- a/changelog.adoc +++ b/changelog.adoc @@ -12,7 +12,9 @@ Fix typo in dag library: TaskDependensOn -> TaskDependsOn === New Features -Add task retries in dag library. +* Add task retries in dag library. + +* Add logging color support in dag library. == v0.30.0: New Features diff --git a/dag/color.go b/dag/color.go new file mode 100644 index 0000000..3a34a0c --- /dev/null +++ b/dag/color.go @@ -0,0 +1,34 @@ +package dag + +import ( + "fmt" + "strings" +) + +func (g *Graph) colorInfo(format string) string { + return g.color(g.InfoColor, format) +} + +func (g *Graph) colorInfoBold(format string) string { + return g.color(g.InfoBoldColor, format) +} + +func (g *Graph) colorError(format string) string { + return g.color(g.ErrorColor, format) +} + +func (g *Graph) colorErrorBold(format string) string { + return g.color(g.ErrorBoldColor, format) +} + +func (g *Graph) color(color string, format string) string { + if !g.UseColor { + return format + } + if !strings.HasSuffix(format, "\n") { + return fmt.Sprintf("\033[%sm%s\033[0m", color, format) + } + + format = format[:len(format)-1] + return fmt.Sprintf("\033[%sm%s\033[0m\n", color, format) +} diff --git a/dag/color_test.go b/dag/color_test.go new file mode 100644 index 0000000..f44afbb --- /dev/null +++ b/dag/color_test.go @@ -0,0 +1,89 @@ +package dag + +import ( + "bytes" + "context" + "errors" + "fmt" + "log" + "sync" + "testing" + "time" + + "github.com/DavidGamba/go-getoptions" +) + +func setupLoggingWithoutTime() *bytes.Buffer { + s := "" + buf := bytes.NewBufferString(s) + Logger.SetFlags(log.Flags() &^ (log.Ldate | log.Ltime)) + Logger.SetOutput(buf) + return buf +} + +func TestColor(t *testing.T) { + buf := setupLoggingWithoutTime() + t.Cleanup(func() { t.Log(buf.String()) }) + + var err error + + sm := sync.Mutex{} + results := []int{} + generateFn := func(n int) getoptions.CommandFn { + return func(ctx context.Context, opt *getoptions.GetOpt, args []string) error { + time.Sleep(30 * time.Millisecond) + if n == 2 { + return fmt.Errorf("failure reason") + } + sm.Lock() + results = append(results, n) + sm.Unlock() + return nil + } + } + + tm := NewTaskMap() + tm.Add("t1", generateFn(1)) + tm.Add("t2", generateFn(2)) + tm.Add("t3", generateFn(3)) + + g := NewGraph("test graph").SetSerial() + g.UseColor = true + g.TaskDependsOn(tm.Get("t1"), tm.Get("t2"), tm.Get("t3")) + g.TaskDependsOn(tm.Get("t2"), tm.Get("t3")) + + // Validate before running + err = g.Validate(tm) + if err != nil { + t.Errorf("Unexpected error: %s\n", err) + } + + err = g.Run(context.Background(), nil, nil) + var errs *Errors + if err == nil || !errors.As(err, &errs) { + t.Fatalf("Unexpected error: %s\n", err) + } + if len(errs.Errors) != 2 { + t.Fatalf("Unexpected error size, %d: %s\n", len(errs.Errors), err) + } + if errs.Errors[0].Error() != "Task test graph:t2 error: failure reason" { + t.Fatalf("Unexpected error: %s\n", errs.Errors[0]) + } + if !errors.Is(errs.Errors[1], ErrorTaskSkipped) { + t.Fatalf("Unexpected error: %s\n", errs.Errors[1]) + } + if len(results) != 1 || results[0] != 3 { + t.Errorf("Wrong list: %v, len: %d, 0: %d\n", results, len(results), results[0]) + } + + expected := "\033[34mRunning Task \033[0m\033[36;1mtest graph:t3\033[0m\n" + + "\033[34mCompleted Task \033[0m\033[36;1mtest graph:t3\033[0m\033[34m in 00m:00s\033[0m\n" + + "\033[34mRunning Task \033[0m\033[36;1mtest graph:t2\033[0m\n" + + "\033[34mCompleted Task \033[0m\033[36;1mtest graph:t2\033[0m\033[34m in 00m:00s\033[0m\n" + + "\033[31mTask \033[0m\033[35;1mtest graph:t2\033[0m\033[31m error: failure reason\033[0m\n" + + "\033[31mTask \033[0m\033[35;1mtest graph:t1\033[0m\033[31m error: skipped\033[0m\n" + + "\033[34mCompleted \033[0m\033[36;1mtest graph\033[0m\033[34m Run in 00m:00s\033[0m\n" + if buf.String() != expected { + t.Errorf("Wrong output:\n'%s'\nexpected:\n'%s'\n", buf.String(), expected) + } +} diff --git a/dag/dag.go b/dag/dag.go index e79a832..9324131 100644 --- a/dag/dag.go +++ b/dag/dag.go @@ -79,6 +79,11 @@ type ( bufferOutput bool bufferWriter io.Writer bufferMutex sync.Mutex + UseColor bool + InfoColor string + InfoBoldColor string + ErrorColor string + ErrorBoldColor string } runStatus int @@ -196,6 +201,10 @@ func NewGraph(name string) *Graph { Vertices: make(map[ID]*Vertex), errs: &Errors{name, make([]error, 0)}, maxParallel: 1_000_000, + InfoColor: "34", + InfoBoldColor: "36;1", + ErrorColor: "31", + ErrorBoldColor: "35;1", } } @@ -424,7 +433,7 @@ LOOP: g.Vertices[iderr.ID].status = runDone if iderr.Error != nil { err := fmt.Errorf("Task %s:%s error: %w", g.Name, iderr.ID, iderr.Error) - Logger.Printf("%s\n", err) + Logger.Printf(g.colorError("Task ")+g.colorErrorBold("%s:%s")+g.colorError(" error: %s\n"), g.Name, iderr.ID, iderr.Error) if !errors.Is(iderr.Error, ErrorSkipParents) { g.errs.Errors = append(g.errs.Errors, err) continue @@ -443,8 +452,8 @@ LOOP: if handledContext { break } - Logger.Printf("Cancelation received or time out reached, allowing in-progress tasks to finish, skipping the rest.\n") - g.errs.Errors = append(g.errs.Errors, fmt.Errorf("cancelation received or time out reached")) + Logger.Print(g.colorError("Cancellation received or time out reached, allowing in-progress tasks to finish, skipping the rest.\n")) + g.errs.Errors = append(g.errs.Errors, fmt.Errorf("cancellation received or time out reached")) handledContext = true default: break @@ -456,7 +465,7 @@ LOOP: } if v.status == runSkip { v.status = runInProgress - Logger.Printf("Skipped Task %s:%s\n", g.Name, v.ID) + Logger.Printf(g.colorError("Skipped Task ")+g.colorErrorBold("%s:%s\n"), g.Name, v.ID) go func(done chan IDErr, v *Vertex) { done <- IDErr{v.ID, nil} }(done, v) @@ -472,7 +481,7 @@ LOOP: go func(ctx context.Context, done chan IDErr, v *Vertex) { semaphore <- struct{}{} defer func() { <-semaphore }() - Logger.Printf("Running Task %s:%s\n", g.Name, v.ID) + Logger.Printf(g.colorInfo("Running Task ")+g.colorInfoBold("%s:%s\n"), g.Name, v.ID) start := time.Now() v.Task.Lock() defer v.Task.Unlock() @@ -492,21 +501,20 @@ LOOP: _, _ = combinedBuffer.WriteTo(g.bufferWriter) g.bufferMutex.Unlock() } - Logger.Printf("Completed Task %s:%s in %s\n", g.Name, v.ID, durationStr(time.Since(start))) + Logger.Printf(g.colorInfo("Completed Task ")+g.colorInfoBold("%s:%s")+g.colorInfo(" in %s\n"), g.Name, v.ID, durationStr(time.Since(start))) if err == nil { break } if err != nil && i < v.Retries { - err = fmt.Errorf("Task %s:%s error: %w", g.Name, v.ID, err) - Logger.Printf("%s", err) - Logger.Printf("Retrying (%d/%d) Task %s:%s\n", i+1, v.Retries, g.Name, v.ID) + Logger.Printf(g.colorError("Task ")+g.colorErrorBold("%s:%s")+g.colorError(" error: %s"), g.Name, v.ID, err) + Logger.Printf(g.colorInfo("Retrying (%d/%d) Task %s:%s\n"), i+1, v.Retries, g.Name, v.ID) } } done <- IDErr{v.ID, err} }(ctx, done, v) } } - Logger.Printf("Completed %s Run in %s\n", g.Name, durationStr(time.Since(runStart))) + Logger.Printf(g.colorInfo("Completed ")+g.colorInfoBold("%s")+g.colorInfo(" Run in %s\n"), g.Name, durationStr(time.Since(runStart))) if len(g.errs.Errors) != 0 { return g.errs @@ -569,7 +577,7 @@ func (g *Graph) getNextVertex() (*Vertex, bool, bool) { // skipParents - Marks all Vertex parents as runDone func skipParents(v *Vertex) { - Logger.Printf("skip parents for %s\n", v.ID) + // Logger.Printf("skip parents for %s\n", v.ID) for _, c := range v.Parents { c.status = runSkip skipParents(c) diff --git a/dag/dag_test.go b/dag/dag_test.go index b8e800f..ae97035 100644 --- a/dag/dag_test.go +++ b/dag/dag_test.go @@ -480,7 +480,7 @@ func TestDagTaskErrorRetry(t *testing.T) { } } -func TestDagContexDone(t *testing.T) { +func TestDagContextDone(t *testing.T) { buf := setupLogging() t.Cleanup(func() { t.Log(buf.String()) }) @@ -526,20 +526,20 @@ func TestDagContexDone(t *testing.T) { if len(errs.Errors) != 5 { t.Fatalf("Unexpected error size, %d: %s\n", len(errs.Errors), err) } - if errs.Errors[0].Error() != "cancelation received or time out reached" { - t.Fatalf("Unexpected error: %s\n", errs.Errors[0]) + if errs.Errors[0].Error() != "cancellation received or time out reached" { + t.Fatalf("Unexpected error 0: %s\n", errs.Errors[0]) } if !errors.Is(errs.Errors[1], ErrorTaskSkipped) { - t.Fatalf("Unexpected error: %s\n", errs.Errors[1]) + t.Fatalf("Unexpected error 1: %s\n", errs.Errors[1]) } if !errors.Is(errs.Errors[2], ErrorTaskSkipped) { - t.Fatalf("Unexpected error: %s\n", errs.Errors[1]) + t.Fatalf("Unexpected error 2: %s\n", errs.Errors[2]) } if !errors.Is(errs.Errors[3], ErrorTaskSkipped) { - t.Fatalf("Unexpected error: %s\n", errs.Errors[1]) + t.Fatalf("Unexpected error 3: %s\n", errs.Errors[3]) } if !errors.Is(errs.Errors[4], ErrorTaskSkipped) { - t.Fatalf("Unexpected error: %s\n", errs.Errors[1]) + t.Fatalf("Unexpected error 4: %s\n", errs.Errors[4]) } if len(results) > 4 { t.Errorf("Wrong list: %v\n", results)