Skip to content

Commit

Permalink
Improve error management (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
soroosh-tanzadeh authored Aug 7, 2024
1 parent a349e2e commit 393acf9
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 12 deletions.
24 changes: 24 additions & 0 deletions runner/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,27 @@ type TaskRunnerError string
func (tre TaskRunnerError) Error() string {
return string(tre)
}

type TaskExecutionError struct {
taskname string
err error
}

func NewTaskExecutionError(task string, err error) TaskExecutionError {
return TaskExecutionError{
err: err,
taskname: task,
}
}

func (t TaskExecutionError) Error() string {
return t.taskname + ":" + t.err.Error()
}

func (t TaskExecutionError) GetError() error {
return t.err
}

func (t TaskExecutionError) GetTaskName() string {
return t.taskname
}
4 changes: 4 additions & 0 deletions runner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ func (t *TaskRunner) ErrorChannel() chan error {
return t.errorChannel
}

func (t *TaskRunner) captureError(err error) {
t.errorChannel <- err
}

func (t *TaskRunner) Start(ctx context.Context) error {
if t.status.Load() != stateInit {
return ErrTaskRunnerAlreadyStarted
Expand Down
6 changes: 4 additions & 2 deletions runner/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func (t *TaskRunnerTestSuit) Test_ShouldRetryTaskBaseOnMaxRetry() {
counter := atomic.Int64{}
expectedPayload := "Test Payload"
expectedError := errors.New("I'm Panic Error")
expectedErrorWrap := NewTaskExecutionError("task", expectedError)
_, taskRunner := t.setupTaskRunner(t.setupRedis())

taskRunner.RegisterTask(&Task{
Expand Down Expand Up @@ -131,7 +132,7 @@ func (t *TaskRunnerTestSuit) Test_ShouldRetryTaskBaseOnMaxRetry() {
t.Assert().Equal(10, counter.Load())
break
case err := <-taskRunner.ErrorChannel():
if err != expectedError {
if err.(TaskExecutionError).GetError().Error() != expectedErrorWrap.GetError().Error() {
t.FailNow(err.Error())
}
case <-time.After(time.Second):
Expand All @@ -144,6 +145,7 @@ func (t *TaskRunnerTestSuit) Test_ShouldRetryTaskWhenPaniced() {
counter := atomic.Int64{}
expectedPayload := "Test Payload"
expectedError := errors.New("I'm Panic Error")
expectedErrorWrap := NewTaskExecutionError("task", expectedError)
_, taskRunner := t.setupTaskRunner(t.setupRedis())

taskRunner.RegisterTask(&Task{
Expand Down Expand Up @@ -174,7 +176,7 @@ func (t *TaskRunnerTestSuit) Test_ShouldRetryTaskWhenPaniced() {
t.Assert().Equal(10, counter.Load())
break
case err := <-taskRunner.ErrorChannel():
if err != expectedError {
if err.(TaskExecutionError).GetError().Error() != expectedErrorWrap.GetError().Error() {
t.FailNow(err.Error())
}
case <-time.After(time.Second):
Expand Down
4 changes: 2 additions & 2 deletions runner/timing.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (t *TaskRunner) timingAggregator() {
continue
}

t.errorChannel <- err
t.captureError(err)
continue
}

Expand All @@ -59,7 +59,7 @@ func (t *TaskRunner) timingAggregator() {
avgTiming := totalExecutionAverage
queueLen, err := t.queue.Len()
if err != nil {
t.errorChannel <- err
t.captureError(err)
return
}
// TODO I don't know if this way of predicting is true or not
Expand Down
15 changes: 7 additions & 8 deletions runner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,17 @@ func (t *TaskRunner) process(ctx context.Context, workerID int) {
logEntry := log.WithField("worker_id", workerID).WithField("cause", r)
if ok {
logEntry = logEntry.WithError(err)
resultChannel <- err
resultChannel <- NewTaskExecutionError(task.Name, err)
} else {
resultChannel <- TaskRunnerError(fmt.Sprintf("Task %s execution failed", task.Name))
resultChannel <- NewTaskExecutionError(task.Name, TaskRunnerError(fmt.Sprintf("Task %s Panic: %v", task.Name, err)))
}

logEntry.Error("Task Panic")

logEntry.Errorf("Task %s Panic", task.Name)
}
}()
// Note: Deferred function calls are pushed onto a stack.
if err := task.Action(ctx, payload); err != nil {
resultChannel <- err
resultChannel <- NewTaskExecutionError(task.Name, err)
}
resultChannel <- true
}
Expand Down Expand Up @@ -132,15 +131,15 @@ func (t *TaskRunner) process(ctx context.Context, workerID int) {
case result := <-resultChannel:
if _, ok := result.(bool); !ok {
failed()
return result.(error)
return result.(TaskExecutionError)
}
t.processed.Add(1)
return nil

// Task execution is taking time, send heartbeat to prevent reClaim
case <-time.After(task.ReservationTimeout):
if err := hbf(ctx); err != nil {
t.errorChannel <- err
t.captureError(err)
}
}
}
Expand All @@ -151,7 +150,7 @@ func (t *TaskRunner) afterProcess(task *Task, payload any) {
if task.Unique {
err := t.releaseLock(task.lockKey(payload), task.lockValue)
if err != nil {
t.errorChannel <- err
t.captureError(err)
}
}
}
Expand Down

0 comments on commit 393acf9

Please sign in to comment.