Skip to content

Commit

Permalink
Better failure reporting from queue
Browse files Browse the repository at this point in the history
  • Loading branch information
jonyoder committed Aug 1, 2023
1 parent 5a022d5 commit e97ce48
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 110 deletions.
81 changes: 22 additions & 59 deletions examples/cmd/markdownRenderer/store/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package store
import (
"database/sql"
"encoding/json"
"errors"
"io/ioutil"
"os"
"testing"
Expand Down Expand Up @@ -182,12 +181,12 @@ func (s *QueueSqliteSuite) TestAddressedPush(c *check.C) {
c.Assert(err, check.DeepEquals, queue.ErrDuplicateAddressedPush)

// Addresses should not be completed
done, err := s.store.IsQueueAddressComplete("abc")
queued, err := s.store.IsQueueAddressInProgress("abc")
c.Assert(err, check.IsNil)
c.Check(done, check.Equals, false)
done, err = s.store.IsQueueAddressComplete("def")
c.Check(queued, check.Equals, true)
queued, err = s.store.IsQueueAddressInProgress("def")
c.Assert(err, check.IsNil)
c.Check(done, check.Equals, false)
c.Check(queued, check.Equals, true)

// Attempt to pop a type that doesn't exist in the queue
queueWork, err := s.store.QueuePop("test", 0, []uint64{888})
Expand All @@ -206,12 +205,12 @@ func (s *QueueSqliteSuite) TestAddressedPush(c *check.C) {
c.Assert(err, check.IsNil)

// First address should be completed
done, err = s.store.IsQueueAddressComplete("abc")
queued, err = s.store.IsQueueAddressInProgress("abc")
c.Assert(err, check.IsNil)
c.Check(done, check.Equals, true)
done, err = s.store.IsQueueAddressComplete("def")
c.Check(queued, check.Equals, false)
queued, err = s.store.IsQueueAddressInProgress("def")
c.Assert(err, check.IsNil)
c.Check(done, check.Equals, false)
c.Check(queued, check.Equals, true)

// Pop second item
queueWork, err = s.store.QueuePop("test", 0, []uint64{TypeTest, TypeTest2})
Expand All @@ -225,22 +224,18 @@ func (s *QueueSqliteSuite) TestAddressedPush(c *check.C) {
c.Assert(err, check.IsNil)

// Second address should be completed
done, err = s.store.IsQueueAddressComplete("def")
queued, err = s.store.IsQueueAddressInProgress("def")
c.Assert(err, check.IsNil)
c.Check(done, check.Equals, true)
c.Check(queued, check.Equals, false)

// Record error for second address
err = s.store.QueueAddressedComplete("def", errors.New("some error"))
// Second address should be completed
queued, err = s.store.IsQueueAddressInProgress("def")
c.Assert(err, check.IsNil)

// Second address should be completed, but with error
done, err = s.store.IsQueueAddressComplete("def")
c.Assert(err, check.ErrorMatches, "some error")
c.Check(done, check.Equals, true)
c.Check(queued, check.Equals, false)

// Cannot check for empty address
_, err = s.store.IsQueueAddressComplete(" ")
c.Check(err, check.ErrorMatches, "no address provided for IsQueueAddressComplete")
_, err = s.store.IsQueueAddressInProgress(" ")
c.Check(err, check.ErrorMatches, "no address provided for IsQueueAddressInProgress")
}

func (s *QueueSqliteSuite) TestQueueGroups(c *check.C) {
Expand Down Expand Up @@ -593,43 +588,6 @@ func (s *QueueSqliteSuite) TestQueueGroupCancel(c *check.C) {
c.Check(cancelled, check.Equals, true)
}

func (s *QueueSqliteSuite) TestAddressFailure(c *check.C) {
// Fail
address := "abcdefg"
// Pass a generic error
err := s.store.QueueAddressedComplete(address, errors.New("first error"))
c.Assert(err, check.IsNil)
err = s.store.(*store).QueueAddressedCheck(address)
c.Check(err, check.ErrorMatches, "first error")

// Should be castable to queue.QueueError pointer
queueError, ok := err.(*queue.QueueError)
c.Assert(ok, check.Equals, true)
c.Check(queueError, check.DeepEquals, &queue.QueueError{
Code: 0, // Not set on generic error
Message: "first error",
})

// Fail again, but pass a typed error
err = s.store.QueueAddressedComplete(address, &queue.QueueError{Code: 404, Message: "second error"})
c.Assert(err, check.IsNil)
err = s.store.(*store).QueueAddressedCheck(address)
c.Check(err, check.ErrorMatches, "second error")

// Should be castable to queue.QueueError pointer
queueError, ok = err.(*queue.QueueError)
c.Assert(ok, check.Equals, true)
c.Check(queueError, check.DeepEquals, &queue.QueueError{
Code: 404,
Message: "second error",
})

// Don't fail
err = s.store.QueueAddressedComplete(address, nil)
c.Assert(err, check.IsNil)
c.Check(s.store.(*store).QueueAddressedCheck(address), check.IsNil)
}

func (s *QueueSqliteSuite) TestIsQueueAddressInProgress(c *check.C) {
// Is a non-existing address in the queue?
found, err := s.store.IsQueueAddressInProgress("def")
Expand All @@ -645,12 +603,17 @@ func (s *QueueSqliteSuite) TestIsQueueAddressInProgress(c *check.C) {
c.Assert(err, check.IsNil)
c.Assert(found, check.Equals, true)

// Assign a permit
w, err := s.store.QueuePop("test", 0, []uint64{TypeTest})
c.Assert(err, check.IsNil)
c.Assert(w.Permit, check.Not(check.Equals), permit.Permit(0))

// Complete the work
err = s.store.QueueAddressedComplete("def", nil)
err = s.store.QueueDelete(w.Permit)
c.Assert(err, check.IsNil)
found, err = s.store.IsQueueAddressInProgress("def")

// Now it should not be found
c.Assert(err, check.IsNil)
c.Assert(found, check.Equals, true)
c.Assert(found, check.Equals, false)
}
24 changes: 8 additions & 16 deletions pkg/rsqueue/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,7 @@ func (a *DefaultAgent) runJob(ctx context.Context, queueWork *queue.QueueWork, j
}

// Run the job (blocks)
func() {

jobError := func() (result error) {
// Extend the job's heartbeat in the queue periodically until the job completes.
done := make(chan struct{})
defer close(done)
Expand All @@ -362,7 +361,7 @@ func (a *DefaultAgent) runJob(ctx context.Context, queueWork *queue.QueueWork, j
case <-ticker.C:
// Extend the job visibility
a.traceLogger.Debugf("Job of type %d visibility timeout needs to be extended: %d\n", queueWork.WorkType, queueWork.Permit)
err := a.queue.Extend(queueWork.Permit)
err = a.queue.Extend(queueWork.Permit)
if err != nil {
a.logger.Debugf("Error extending job for work type %d: %s", queueWork.WorkType, err)
}
Expand All @@ -374,24 +373,17 @@ func (a *DefaultAgent) runJob(ctx context.Context, queueWork *queue.QueueWork, j

// Actually run the job
a.traceLogger.Debugf("Running job with type %d, address '%s', and permit '%d'", queueWork.WorkType, queueWork.Address, queueWork.Permit)
err := a.runner.Run(queue.RecursableWork{
err = a.runner.Run(queue.RecursableWork{
Work: queueWork.Work,
WorkType: queueWork.WorkType,
Context: ctx,
})
if err != nil {
result = err
a.logger.Debugf("Job type %d: address: %s work: %#v returned error: %s\n", queueWork.WorkType, queueWork.Address, string(queueWork.Work), err)
}

// If the work was addressed, record the result
if queueWork.Address != "" {
// Note, `RecordFailure` will record the error if err != nil. If
// err == nil, then it clears any recorded error for the address.
err = a.queue.RecordFailure(queueWork.Address, err)
if err != nil {
a.logger.Debugf("Failed while recording addressed work success/failure: %s\n", err)
}
}
return
}()

// Decrement the job count
Expand All @@ -416,16 +408,16 @@ func (a *DefaultAgent) runJob(ctx context.Context, queueWork *queue.QueueWork, j
// Delete the job from the queue
a.traceLogger.Debugf("Deleting job from queue: %d\n", queueWork.Permit)

if err := a.queue.Delete(queueWork.Permit); err != nil {
a.logger.Debugf("queue Delete() returned error: %s", err)
if deleteErr := a.queue.Delete(queueWork.Permit); deleteErr != nil {
a.logger.Debugf("queue Delete() returned error: %s", deleteErr)
}

// Notify that work is complete if work is addressed. This must happen after the work has been deleted from the
// queue.
if queueWork.Address != "" {
n := time.Now()
a.traceLogger.Debugf("Ready to notify of address %s", queueWork.Address)
notify(agenttypes.NewWorkCompleteNotification(queueWork.Address, a.notifyTypeWorkComplete))
notify(agenttypes.NewWorkCompleteNotification(queueWork.Address, a.notifyTypeWorkComplete, jobError))
a.traceLogger.Debugf("Notified of address %s in %d ms", queueWork.Address, time.Now().Sub(n).Nanoseconds()/1000000)
}

Expand Down
31 changes: 11 additions & 20 deletions pkg/rsqueue/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ type FakeQueue struct {
extend error
mutex sync.Mutex
record error
errs []error
}

func (*FakeQueue) Push(priority uint64, groupId int64, work queue.Work) error {
Expand All @@ -138,12 +137,6 @@ func (*FakeQueue) IsAddressInQueue(address string) (bool, error) {
func (f *FakeQueue) PollAddress(address string) (errs <-chan error) {
return f.pollErrs
}
func (f *FakeQueue) RecordFailure(address string, failure error) error {
if f.record == nil {
f.errs = append(f.errs, failure)
}
return f.record
}
func (*FakeQueue) Get(maxPriority uint64, maxPriorityChan chan uint64, types queue.QueueSupportedTypes, stop chan bool) (*queue.QueueWork, error) {
return nil, nil
}
Expand Down Expand Up @@ -214,9 +207,9 @@ func (s *AgentSuite) TestWaitBlocked(c *check.C) {
}()

// Send some messages across the queue while blocked and make sure they're discarded
msgs <- agenttypes.NewWorkCompleteNotification("somewhere1", 10)
msgs <- agenttypes.NewWorkCompleteNotification("somewhere2", 10)
msgs <- agenttypes.NewWorkCompleteNotification("somewhere3", 10)
msgs <- agenttypes.NewWorkCompleteNotification("somewhere1", 10, nil)
msgs <- agenttypes.NewWorkCompleteNotification("somewhere2", 10, nil)
msgs <- agenttypes.NewWorkCompleteNotification("somewhere3", 10, nil)

c.Check(completed, check.Equals, false)
jobDone <- 98
Expand All @@ -229,7 +222,6 @@ func (s *AgentSuite) TestWaitBlocked(c *check.C) {
func (s *AgentSuite) TestRunJobOk(c *check.C) {
q := &FakeQueue{
extended: make(map[permit.Permit]int),
errs: make([]error, 0),
}
finish := map[string]chan bool{
"one": make(chan bool),
Expand Down Expand Up @@ -316,11 +308,6 @@ func (s *AgentSuite) TestRunJobOk(c *check.C) {
c.Check(receivedPriority[1], check.Equals, MAX_CONCURRENCY)
c.Check(receivedPriority[2], check.Equals, MAX_CONCURRENCY)

// Jobs b2 and b3 should have been marked as successful since they were addressed
c.Check(q.errs, check.HasLen, 2)
c.Check(q.errs[0], check.IsNil)
c.Check(q.errs[1], check.IsNil)

// Jobs should have been deleted
c.Check(q.deleted, check.Equals, 3)

Expand All @@ -337,7 +324,6 @@ func (s *AgentSuite) TestRunJobOk(c *check.C) {
func (s *AgentSuite) TestRunJobFails(c *check.C) {
q := &FakeQueue{
extended: make(map[permit.Permit]int),
errs: make([]error, 0),
}
finish := map[string]chan bool{
"one": make(chan bool),
Expand All @@ -363,7 +349,10 @@ func (s *AgentSuite) TestRunJobFails(c *check.C) {
b1, err := json.Marshal(&w1)
c.Assert(err, check.IsNil)

n := func(n listener.Notification) {}
var notified listener.Notification
n := func(n listener.Notification) {
notified = n
}

maxPriorityDone := make(chan struct{})
go func() {
Expand All @@ -387,8 +376,10 @@ func (s *AgentSuite) TestRunJobFails(c *check.C) {
<-maxPriorityDone

// Jobs should have been marked as failed since it was addressed
c.Check(q.errs, check.HasLen, 1)
c.Check(q.errs[0], check.ErrorMatches, "work failed")
c.Assert(notified, check.FitsTypeOf, &agenttypes.WorkCompleteNotification{})
workNotified, ok := notified.(*agenttypes.WorkCompleteNotification)
c.Assert(ok, check.Equals, true)
c.Assert(workNotified.Error, check.ErrorMatches, "work failed")

// Jobs should have been deleted
c.Check(q.deleted, check.Equals, 1)
Expand Down
4 changes: 3 additions & 1 deletion pkg/rsqueue/agent/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ type Notify func(n listener.Notification)
type WorkCompleteNotification struct {
listener.GenericNotification
Address string
Error error
}

func NewWorkCompleteNotification(address string, workType uint8) *WorkCompleteNotification {
func NewWorkCompleteNotification(address string, workType uint8, err error) *WorkCompleteNotification {
return &WorkCompleteNotification{
GenericNotification: listener.GenericNotification{
NotifyGuid: uuid.New().String(),
NotifyType: workType,
},
Address: address,
Error: err,
}
}
17 changes: 16 additions & 1 deletion pkg/rsqueue/agent/types/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package agenttypes
// Copyright (C) 2022 by RStudio, PBC

import (
"errors"
"testing"

"github.com/rstudio/platform-lib/pkg/rsnotify/listener"
Expand All @@ -16,7 +17,7 @@ func TestPackage(t *testing.T) { check.TestingT(t) }
var _ = check.Suite(&AgentTypesSuite{})

func (s *AgentTypesSuite) TestWorkCompleteNotification(c *check.C) {
cn := NewWorkCompleteNotification("some_address", 10)
cn := NewWorkCompleteNotification("some_address", 10, nil)
// Guid should be set
c.Assert(cn.Guid(), check.Not(check.Equals), "")
cn.NotifyGuid = ""
Expand All @@ -27,4 +28,18 @@ func (s *AgentTypesSuite) TestWorkCompleteNotification(c *check.C) {

Address: "some_address",
})

err := errors.New("some error")
cn = NewWorkCompleteNotification("some_address", 10, err)
// Guid should be set
c.Assert(cn.Guid(), check.Not(check.Equals), "")
cn.NotifyGuid = ""
c.Assert(cn, check.DeepEquals, &WorkCompleteNotification{
GenericNotification: listener.GenericNotification{
NotifyType: uint8(10),
},

Address: "some_address",
Error: err,
})
}
3 changes: 0 additions & 3 deletions pkg/rsqueue/groups/grouprunner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,6 @@ func (f *fakeQueue) IsAddressInQueue(address string) (bool, error) {
func (f *fakeQueue) PollAddress(address string) (errs <-chan error) {
return f.pollErrs
}
func (f *fakeQueue) RecordFailure(address string, failure error) error {
return f.record
}
func (f *fakeQueue) Get(maxPriority uint64, maxPriorityChan chan uint64, types queue.QueueSupportedTypes, stop chan bool) (*queue.QueueWork, error) {
return nil, errors.New("n/i")
}
Expand Down
6 changes: 0 additions & 6 deletions pkg/rsqueue/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ type Queue interface {
// * work the work to push into the queue. Must be JSON-serializable
AddressedPush(priority uint64, groupId int64, address string, work Work) error

// RecordFailure records a failure of addressed work in the queue
// * address the address of the work the failed
// * failure the error that occurred. Overwrites any previous error information
// from earlier runs of the same work. If `failure==nil`, the error is cleared.
RecordFailure(address string, failure error) error

// PollAddress polls addressed work in the queue, and an `errs` channels
// to report when the work is done and/or an error has occurred. We pass
// `nil` over the errs channel when the poll has completed without errors
Expand Down
4 changes: 0 additions & 4 deletions pkg/rsqueue/queue/recording_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,6 @@ func (q *RecordingProducer) PollAddress(address string) (errs <-chan error) {
return q.PollErrs
}

func (q *RecordingProducer) RecordFailure(address string, failure error) error {
return q.RecordErr
}

func (q *RecordingProducer) Get(maxPriority uint64, maxPriorityChan chan uint64, types QueueSupportedTypes, stop chan bool) (*QueueWork, error) {
return nil, kaboom
}
Expand Down

0 comments on commit e97ce48

Please sign in to comment.