Skip to content

Commit

Permalink
WorkerPools always warn (and never exit on errors) (#1077)
Browse files Browse the repository at this point in the history
  • Loading branch information
timburks authored Mar 8, 2023
1 parent c46b9de commit b9f77d1
Show file tree
Hide file tree
Showing 6 changed files with 6 additions and 27 deletions.
2 changes: 1 addition & 1 deletion cmd/registry/cmd/compute/score/score.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func Command() *cobra.Command {
if err != nil {
log.FromContext(ctx).WithError(err).Fatal("Failed to get jobs from flags")
}
taskQueue, wait := tasks.WorkerPoolWithWarnings(ctx, jobs)
taskQueue, wait := tasks.WorkerPool(ctx, jobs)
defer wait()

inputPattern, err := patterns.ParseResourcePattern(args[0])
Expand Down
2 changes: 1 addition & 1 deletion cmd/registry/cmd/compute/scorecard/scorecard.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func Command() *cobra.Command {
if err != nil {
log.FromContext(ctx).WithError(err).Fatal("Failed to get jobs from flags")
}
taskQueue, wait := tasks.WorkerPoolWithWarnings(ctx, jobs)
taskQueue, wait := tasks.WorkerPool(ctx, jobs)
defer wait()

inputPattern, err := patterns.ParseResourcePattern(args[0])
Expand Down
2 changes: 1 addition & 1 deletion cmd/registry/cmd/compute/vocabulary/vocabulary.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func Command() *cobra.Command {
if err != nil {
log.FromContext(ctx).WithError(err).Fatal("Failed to get jobs from flags")
}
taskQueue, wait := tasks.WorkerPoolWithWarnings(ctx, jobs)
taskQueue, wait := tasks.WorkerPool(ctx, jobs)
defer wait()

parsed, err := names.ParseSpecRevision(path)
Expand Down
2 changes: 1 addition & 1 deletion cmd/registry/cmd/resolve/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func Command() *cobra.Command {
}

log.Debug(ctx, "Starting execution...")
taskQueue, wait := tasks.WorkerPoolWithWarnings(ctx, jobs)
taskQueue, wait := tasks.WorkerPool(ctx, jobs)
defer wait()
// Submit tasks to taskQueue
for i := 0; i < len(actions) && i < maxActions; i++ {
Expand Down
23 changes: 1 addition & 22 deletions cmd/registry/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,6 @@ func WorkerPool(ctx context.Context, n int) (chan<- Task, func()) {
return taskQueue, wait
}

// Similar to WorkerPool except it creates workers which log task errors as "Warnings"
func WorkerPoolWithWarnings(ctx context.Context, n int) (chan<- Task, func()) {
var wg sync.WaitGroup
taskQueue := make(chan Task, 1024)
for i := 0; i < n; i++ {
wg.Add(1)
go worker(ctx, &wg, taskQueue, true)
}

wait := func() {
close(taskQueue)
wg.Wait()
}

return taskQueue, wait
}

// A worker which pulls tasks from the taskQueue, executes them and logs errors if any.
func worker(ctx context.Context, wg *sync.WaitGroup, taskQueue <-chan Task, warnOnError bool) {
defer wg.Done()
Expand All @@ -75,11 +58,7 @@ func worker(ctx context.Context, wg *sync.WaitGroup, taskQueue <-chan Task, warn
return
default:
if err := task.Run(ctx); err != nil {
if warnOnError {
log.FromContext(ctx).WithError(err).Warnf("Task failed: %s", task)
} else {
log.FromContext(ctx).WithError(err).Fatalf("Task failed: %s", task)
}
log.FromContext(ctx).WithError(err).Warnf("Task failed: %s", task)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/registry/tasks/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestWorkerPoolWithWarnings(t *testing.T) {
ctx := context.Background()
jobs := 1

taskQueue, wait := WorkerPoolWithWarnings(ctx, jobs)
taskQueue, wait := WorkerPool(ctx, jobs)
defer wait()

for i := 0; i < 10; i++ {
Expand Down

0 comments on commit b9f77d1

Please sign in to comment.