diff --git a/examples/cmd/markdownRenderer/store/queue_test.go b/examples/cmd/markdownRenderer/store/queue_test.go index 06a8869..5f4defe 100644 --- a/examples/cmd/markdownRenderer/store/queue_test.go +++ b/examples/cmd/markdownRenderer/store/queue_test.go @@ -5,7 +5,6 @@ package store import ( "database/sql" "encoding/json" - "errors" "io/ioutil" "os" "testing" @@ -182,12 +181,12 @@ func (s *QueueSqliteSuite) TestAddressedPush(c *check.C) { c.Assert(err, check.DeepEquals, queue.ErrDuplicateAddressedPush) // Addresses should not be completed - done, err := s.store.IsQueueAddressComplete("abc") + queued, err := s.store.IsQueueAddressInProgress("abc") c.Assert(err, check.IsNil) - c.Check(done, check.Equals, false) - done, err = s.store.IsQueueAddressComplete("def") + c.Check(queued, check.Equals, true) + queued, err = s.store.IsQueueAddressInProgress("def") c.Assert(err, check.IsNil) - c.Check(done, check.Equals, false) + c.Check(queued, check.Equals, true) // Attempt to pop a type that doesn't exist in the queue queueWork, err := s.store.QueuePop("test", 0, []uint64{888}) @@ -206,12 +205,12 @@ func (s *QueueSqliteSuite) TestAddressedPush(c *check.C) { c.Assert(err, check.IsNil) // First address should be completed - done, err = s.store.IsQueueAddressComplete("abc") + queued, err = s.store.IsQueueAddressInProgress("abc") c.Assert(err, check.IsNil) - c.Check(done, check.Equals, true) - done, err = s.store.IsQueueAddressComplete("def") + c.Check(queued, check.Equals, false) + queued, err = s.store.IsQueueAddressInProgress("def") c.Assert(err, check.IsNil) - c.Check(done, check.Equals, false) + c.Check(queued, check.Equals, true) // Pop second item queueWork, err = s.store.QueuePop("test", 0, []uint64{TypeTest, TypeTest2}) @@ -225,22 +224,18 @@ func (s *QueueSqliteSuite) TestAddressedPush(c *check.C) { c.Assert(err, check.IsNil) // Second address should be completed - done, err = s.store.IsQueueAddressComplete("def") + queued, err = s.store.IsQueueAddressInProgress("def") c.Assert(err, check.IsNil) - c.Check(done, check.Equals, true) + c.Check(queued, check.Equals, false) - // Record error for second address - err = s.store.QueueAddressedComplete("def", errors.New("some error")) + // Second address should be completed + queued, err = s.store.IsQueueAddressInProgress("def") c.Assert(err, check.IsNil) - - // Second address should be completed, but with error - done, err = s.store.IsQueueAddressComplete("def") - c.Assert(err, check.ErrorMatches, "some error") - c.Check(done, check.Equals, true) + c.Check(queued, check.Equals, false) // Cannot check for empty address - _, err = s.store.IsQueueAddressComplete(" ") - c.Check(err, check.ErrorMatches, "no address provided for IsQueueAddressComplete") + _, err = s.store.IsQueueAddressInProgress(" ") + c.Check(err, check.ErrorMatches, "no address provided for IsQueueAddressInProgress") } func (s *QueueSqliteSuite) TestQueueGroups(c *check.C) { @@ -593,43 +588,6 @@ func (s *QueueSqliteSuite) TestQueueGroupCancel(c *check.C) { c.Check(cancelled, check.Equals, true) } -func (s *QueueSqliteSuite) TestAddressFailure(c *check.C) { - // Fail - address := "abcdefg" - // Pass a generic error - err := s.store.QueueAddressedComplete(address, errors.New("first error")) - c.Assert(err, check.IsNil) - err = s.store.(*store).QueueAddressedCheck(address) - c.Check(err, check.ErrorMatches, "first error") - - // Should be castable to queue.QueueError pointer - queueError, ok := err.(*queue.QueueError) - c.Assert(ok, check.Equals, true) - c.Check(queueError, check.DeepEquals, &queue.QueueError{ - Code: 0, // Not set on generic error - Message: "first error", - }) - - // Fail again, but pass a typed error - err = s.store.QueueAddressedComplete(address, &queue.QueueError{Code: 404, Message: "second error"}) - c.Assert(err, check.IsNil) - err = s.store.(*store).QueueAddressedCheck(address) - c.Check(err, check.ErrorMatches, "second error") - - // Should be castable to queue.QueueError pointer - queueError, ok = err.(*queue.QueueError) - c.Assert(ok, check.Equals, true) - c.Check(queueError, check.DeepEquals, &queue.QueueError{ - Code: 404, - Message: "second error", - }) - - // Don't fail - err = s.store.QueueAddressedComplete(address, nil) - c.Assert(err, check.IsNil) - c.Check(s.store.(*store).QueueAddressedCheck(address), check.IsNil) -} - func (s *QueueSqliteSuite) TestIsQueueAddressInProgress(c *check.C) { // Is a non-existing address in the queue? found, err := s.store.IsQueueAddressInProgress("def") @@ -645,12 +603,17 @@ func (s *QueueSqliteSuite) TestIsQueueAddressInProgress(c *check.C) { c.Assert(err, check.IsNil) c.Assert(found, check.Equals, true) + // Assign a permit + w, err := s.store.QueuePop("test", 0, []uint64{TypeTest}) + c.Assert(err, check.IsNil) + c.Assert(w.Permit, check.Not(check.Equals), permit.Permit(0)) + // Complete the work - err = s.store.QueueAddressedComplete("def", nil) + err = s.store.QueueDelete(w.Permit) c.Assert(err, check.IsNil) found, err = s.store.IsQueueAddressInProgress("def") // Now it should not be found c.Assert(err, check.IsNil) - c.Assert(found, check.Equals, true) + c.Assert(found, check.Equals, false) } diff --git a/pkg/rsqueue/agent/agent.go b/pkg/rsqueue/agent/agent.go index 9173ab9..440ba9f 100644 --- a/pkg/rsqueue/agent/agent.go +++ b/pkg/rsqueue/agent/agent.go @@ -349,8 +349,7 @@ func (a *DefaultAgent) runJob(ctx context.Context, queueWork *queue.QueueWork, j } // Run the job (blocks) - func() { - + jobError := func() (result error) { // Extend the job's heartbeat in the queue periodically until the job completes. done := make(chan struct{}) defer close(done) @@ -362,7 +361,7 @@ func (a *DefaultAgent) runJob(ctx context.Context, queueWork *queue.QueueWork, j case <-ticker.C: // Extend the job visibility a.traceLogger.Debugf("Job of type %d visibility timeout needs to be extended: %d\n", queueWork.WorkType, queueWork.Permit) - err := a.queue.Extend(queueWork.Permit) + err = a.queue.Extend(queueWork.Permit) if err != nil { a.logger.Debugf("Error extending job for work type %d: %s", queueWork.WorkType, err) } @@ -374,24 +373,17 @@ func (a *DefaultAgent) runJob(ctx context.Context, queueWork *queue.QueueWork, j // Actually run the job a.traceLogger.Debugf("Running job with type %d, address '%s', and permit '%d'", queueWork.WorkType, queueWork.Address, queueWork.Permit) - err := a.runner.Run(queue.RecursableWork{ + err = a.runner.Run(queue.RecursableWork{ Work: queueWork.Work, WorkType: queueWork.WorkType, Context: ctx, }) if err != nil { + result = err a.logger.Debugf("Job type %d: address: %s work: %#v returned error: %s\n", queueWork.WorkType, queueWork.Address, string(queueWork.Work), err) } - // If the work was addressed, record the result - if queueWork.Address != "" { - // Note, `RecordFailure` will record the error if err != nil. If - // err == nil, then it clears any recorded error for the address. - err = a.queue.RecordFailure(queueWork.Address, err) - if err != nil { - a.logger.Debugf("Failed while recording addressed work success/failure: %s\n", err) - } - } + return }() // Decrement the job count @@ -416,8 +408,8 @@ func (a *DefaultAgent) runJob(ctx context.Context, queueWork *queue.QueueWork, j // Delete the job from the queue a.traceLogger.Debugf("Deleting job from queue: %d\n", queueWork.Permit) - if err := a.queue.Delete(queueWork.Permit); err != nil { - a.logger.Debugf("queue Delete() returned error: %s", err) + if deleteErr := a.queue.Delete(queueWork.Permit); deleteErr != nil { + a.logger.Debugf("queue Delete() returned error: %s", deleteErr) } // Notify that work is complete if work is addressed. This must happen after the work has been deleted from the @@ -425,7 +417,7 @@ func (a *DefaultAgent) runJob(ctx context.Context, queueWork *queue.QueueWork, j if queueWork.Address != "" { n := time.Now() a.traceLogger.Debugf("Ready to notify of address %s", queueWork.Address) - notify(agenttypes.NewWorkCompleteNotification(queueWork.Address, a.notifyTypeWorkComplete)) + notify(agenttypes.NewWorkCompleteNotification(queueWork.Address, a.notifyTypeWorkComplete, jobError)) a.traceLogger.Debugf("Notified of address %s in %d ms", queueWork.Address, time.Now().Sub(n).Nanoseconds()/1000000) } diff --git a/pkg/rsqueue/agent/agent_test.go b/pkg/rsqueue/agent/agent_test.go index 112a698..2a855d0 100644 --- a/pkg/rsqueue/agent/agent_test.go +++ b/pkg/rsqueue/agent/agent_test.go @@ -117,7 +117,6 @@ type FakeQueue struct { extend error mutex sync.Mutex record error - errs []error } func (*FakeQueue) Push(priority uint64, groupId int64, work queue.Work) error { @@ -138,12 +137,6 @@ func (*FakeQueue) IsAddressInQueue(address string) (bool, error) { func (f *FakeQueue) PollAddress(address string) (errs <-chan error) { return f.pollErrs } -func (f *FakeQueue) RecordFailure(address string, failure error) error { - if f.record == nil { - f.errs = append(f.errs, failure) - } - return f.record -} func (*FakeQueue) Get(maxPriority uint64, maxPriorityChan chan uint64, types queue.QueueSupportedTypes, stop chan bool) (*queue.QueueWork, error) { return nil, nil } @@ -214,9 +207,9 @@ func (s *AgentSuite) TestWaitBlocked(c *check.C) { }() // Send some messages across the queue while blocked and make sure they're discarded - msgs <- agenttypes.NewWorkCompleteNotification("somewhere1", 10) - msgs <- agenttypes.NewWorkCompleteNotification("somewhere2", 10) - msgs <- agenttypes.NewWorkCompleteNotification("somewhere3", 10) + msgs <- agenttypes.NewWorkCompleteNotification("somewhere1", 10, nil) + msgs <- agenttypes.NewWorkCompleteNotification("somewhere2", 10, nil) + msgs <- agenttypes.NewWorkCompleteNotification("somewhere3", 10, nil) c.Check(completed, check.Equals, false) jobDone <- 98 @@ -229,7 +222,6 @@ func (s *AgentSuite) TestWaitBlocked(c *check.C) { func (s *AgentSuite) TestRunJobOk(c *check.C) { q := &FakeQueue{ extended: make(map[permit.Permit]int), - errs: make([]error, 0), } finish := map[string]chan bool{ "one": make(chan bool), @@ -316,11 +308,6 @@ func (s *AgentSuite) TestRunJobOk(c *check.C) { c.Check(receivedPriority[1], check.Equals, MAX_CONCURRENCY) c.Check(receivedPriority[2], check.Equals, MAX_CONCURRENCY) - // Jobs b2 and b3 should have been marked as successful since they were addressed - c.Check(q.errs, check.HasLen, 2) - c.Check(q.errs[0], check.IsNil) - c.Check(q.errs[1], check.IsNil) - // Jobs should have been deleted c.Check(q.deleted, check.Equals, 3) @@ -337,7 +324,6 @@ func (s *AgentSuite) TestRunJobOk(c *check.C) { func (s *AgentSuite) TestRunJobFails(c *check.C) { q := &FakeQueue{ extended: make(map[permit.Permit]int), - errs: make([]error, 0), } finish := map[string]chan bool{ "one": make(chan bool), @@ -363,7 +349,10 @@ func (s *AgentSuite) TestRunJobFails(c *check.C) { b1, err := json.Marshal(&w1) c.Assert(err, check.IsNil) - n := func(n listener.Notification) {} + var notified listener.Notification + n := func(n listener.Notification) { + notified = n + } maxPriorityDone := make(chan struct{}) go func() { @@ -387,8 +376,10 @@ func (s *AgentSuite) TestRunJobFails(c *check.C) { <-maxPriorityDone // Jobs should have been marked as failed since it was addressed - c.Check(q.errs, check.HasLen, 1) - c.Check(q.errs[0], check.ErrorMatches, "work failed") + c.Assert(notified, check.FitsTypeOf, &agenttypes.WorkCompleteNotification{}) + workNotified, ok := notified.(*agenttypes.WorkCompleteNotification) + c.Assert(ok, check.Equals, true) + c.Assert(workNotified.Error, check.ErrorMatches, "work failed") // Jobs should have been deleted c.Check(q.deleted, check.Equals, 1) diff --git a/pkg/rsqueue/agent/types/types.go b/pkg/rsqueue/agent/types/types.go index 9a654c2..318f302 100644 --- a/pkg/rsqueue/agent/types/types.go +++ b/pkg/rsqueue/agent/types/types.go @@ -21,14 +21,16 @@ type Notify func(n listener.Notification) type WorkCompleteNotification struct { listener.GenericNotification Address string + Error error } -func NewWorkCompleteNotification(address string, workType uint8) *WorkCompleteNotification { +func NewWorkCompleteNotification(address string, workType uint8, err error) *WorkCompleteNotification { return &WorkCompleteNotification{ GenericNotification: listener.GenericNotification{ NotifyGuid: uuid.New().String(), NotifyType: workType, }, Address: address, + Error: err, } } diff --git a/pkg/rsqueue/agent/types/types_test.go b/pkg/rsqueue/agent/types/types_test.go index d7a50b9..7ae3c7f 100644 --- a/pkg/rsqueue/agent/types/types_test.go +++ b/pkg/rsqueue/agent/types/types_test.go @@ -3,6 +3,7 @@ package agenttypes // Copyright (C) 2022 by RStudio, PBC import ( + "errors" "testing" "github.com/rstudio/platform-lib/pkg/rsnotify/listener" @@ -16,7 +17,7 @@ func TestPackage(t *testing.T) { check.TestingT(t) } var _ = check.Suite(&AgentTypesSuite{}) func (s *AgentTypesSuite) TestWorkCompleteNotification(c *check.C) { - cn := NewWorkCompleteNotification("some_address", 10) + cn := NewWorkCompleteNotification("some_address", 10, nil) // Guid should be set c.Assert(cn.Guid(), check.Not(check.Equals), "") cn.NotifyGuid = "" @@ -27,4 +28,18 @@ func (s *AgentTypesSuite) TestWorkCompleteNotification(c *check.C) { Address: "some_address", }) + + err := errors.New("some error") + cn = NewWorkCompleteNotification("some_address", 10, err) + // Guid should be set + c.Assert(cn.Guid(), check.Not(check.Equals), "") + cn.NotifyGuid = "" + c.Assert(cn, check.DeepEquals, &WorkCompleteNotification{ + GenericNotification: listener.GenericNotification{ + NotifyType: uint8(10), + }, + + Address: "some_address", + Error: err, + }) } diff --git a/pkg/rsqueue/groups/grouprunner_test.go b/pkg/rsqueue/groups/grouprunner_test.go index 9fa724b..26324b3 100644 --- a/pkg/rsqueue/groups/grouprunner_test.go +++ b/pkg/rsqueue/groups/grouprunner_test.go @@ -121,9 +121,6 @@ func (f *fakeQueue) IsAddressInQueue(address string) (bool, error) { func (f *fakeQueue) PollAddress(address string) (errs <-chan error) { return f.pollErrs } -func (f *fakeQueue) RecordFailure(address string, failure error) error { - return f.record -} func (f *fakeQueue) Get(maxPriority uint64, maxPriorityChan chan uint64, types queue.QueueSupportedTypes, stop chan bool) (*queue.QueueWork, error) { return nil, errors.New("n/i") } diff --git a/pkg/rsqueue/queue/queue.go b/pkg/rsqueue/queue/queue.go index bc76dc9..2b64d76 100644 --- a/pkg/rsqueue/queue/queue.go +++ b/pkg/rsqueue/queue/queue.go @@ -35,12 +35,6 @@ type Queue interface { // * work the work to push into the queue. Must be JSON-serializable AddressedPush(priority uint64, groupId int64, address string, work Work) error - // RecordFailure records a failure of addressed work in the queue - // * address the address of the work the failed - // * failure the error that occurred. Overwrites any previous error information - // from earlier runs of the same work. If `failure==nil`, the error is cleared. - RecordFailure(address string, failure error) error - // PollAddress polls addressed work in the queue, and an `errs` channels // to report when the work is done and/or an error has occurred. We pass // `nil` over the errs channel when the poll has completed without errors diff --git a/pkg/rsqueue/queue/recording_producer.go b/pkg/rsqueue/queue/recording_producer.go index bf27e51..cf0fd84 100644 --- a/pkg/rsqueue/queue/recording_producer.go +++ b/pkg/rsqueue/queue/recording_producer.go @@ -91,10 +91,6 @@ func (q *RecordingProducer) PollAddress(address string) (errs <-chan error) { return q.PollErrs } -func (q *RecordingProducer) RecordFailure(address string, failure error) error { - return q.RecordErr -} - func (q *RecordingProducer) Get(maxPriority uint64, maxPriorityChan chan uint64, types QueueSupportedTypes, stop chan bool) (*QueueWork, error) { return nil, kaboom }