From 3b1de797e03cc132b0404adad4efd462b8b7347d Mon Sep 17 00:00:00 2001 From: Bryttanie House Date: Mon, 4 Nov 2024 14:00:34 -0500 Subject: [PATCH] Fixes 4927: add flag to prevent requeuing canceled tasks --- db/migrations.latest | 2 +- ...955_add_cancel_attempted_to_tasks.down.sql | 5 +++ ...15955_add_cancel_attempted_to_tasks.up.sql | 7 ++++ pkg/models/task_info.go | 39 ++++++++++--------- pkg/tasks/queue/pgqueue.go | 22 +++++++++-- pkg/tasks/queue/pgqueue_test.go | 20 ++++++++++ 6 files changed, 72 insertions(+), 23 deletions(-) create mode 100644 db/migrations/20241104115955_add_cancel_attempted_to_tasks.down.sql create mode 100644 db/migrations/20241104115955_add_cancel_attempted_to_tasks.up.sql diff --git a/db/migrations.latest b/db/migrations.latest index aa2d1d216..81a93306d 100644 --- a/db/migrations.latest +++ b/db/migrations.latest @@ -1,2 +1,2 @@ -20241018154315 +20241104115955 diff --git a/db/migrations/20241104115955_add_cancel_attempted_to_tasks.down.sql b/db/migrations/20241104115955_add_cancel_attempted_to_tasks.down.sql new file mode 100644 index 000000000..73222d2c8 --- /dev/null +++ b/db/migrations/20241104115955_add_cancel_attempted_to_tasks.down.sql @@ -0,0 +1,5 @@ +BEGIN; + +ALTER TABLE tasks DROP COLUMN IF EXISTS cancel_attempted; + +COMMIT; diff --git a/db/migrations/20241104115955_add_cancel_attempted_to_tasks.up.sql b/db/migrations/20241104115955_add_cancel_attempted_to_tasks.up.sql new file mode 100644 index 000000000..b5714998b --- /dev/null +++ b/db/migrations/20241104115955_add_cancel_attempted_to_tasks.up.sql @@ -0,0 +1,7 @@ +BEGIN; + +ALTER TABLE tasks ADD COLUMN IF NOT EXISTS cancel_attempted BOOLEAN DEFAULT FALSE; + +UPDATE tasks SET cancel_attempted = true WHERE status = 'canceled'; + +COMMIT; diff --git a/pkg/models/task_info.go b/pkg/models/task_info.go index 82daebd0e..dca8b8180 100644 --- a/pkg/models/task_info.go +++ b/pkg/models/task_info.go @@ -12,25 +12,26 @@ import ( // Shared by DAO and queue packages // GORM only used in DAO to read from table type TaskInfo struct { - Id uuid.UUID `gorm:"primary_key;column:id"` - Typename string `gorm:"column:type"` // "introspect" or "snapshot" - Payload json.RawMessage `gorm:"type:jsonb"` - OrgId string - AccountId string - ObjectUUID uuid.UUID - ObjectType *string - Dependencies pq.StringArray `gorm:"->;column:t_dependencies;type:text[]"` - Dependents pq.StringArray `gorm:"->;column:t_dependents;type:text[]"` - Token uuid.UUID - Queued *time.Time `gorm:"column:queued_at"` - Started *time.Time `gorm:"column:started_at"` - Finished *time.Time `gorm:"column:finished_at"` - Error *string - Status string - RequestID string - Retries int - NextRetryTime *time.Time - Priority int + Id uuid.UUID `gorm:"primary_key;column:id"` + Typename string `gorm:"column:type"` // "introspect" or "snapshot" + Payload json.RawMessage `gorm:"type:jsonb"` + OrgId string + AccountId string + ObjectUUID uuid.UUID + ObjectType *string + Dependencies pq.StringArray `gorm:"->;column:t_dependencies;type:text[]"` + Dependents pq.StringArray `gorm:"->;column:t_dependents;type:text[]"` + Token uuid.UUID + Queued *time.Time `gorm:"column:queued_at"` + Started *time.Time `gorm:"column:started_at"` + Finished *time.Time `gorm:"column:finished_at"` + Error *string + Status string + RequestID string + Retries int + NextRetryTime *time.Time + Priority int + CancelAttempted bool } type TaskInfoRepositoryConfiguration struct { diff --git a/pkg/tasks/queue/pgqueue.go b/pkg/tasks/queue/pgqueue.go index 7f40670ff..1a2eeec2c 100644 --- a/pkg/tasks/queue/pgqueue.go +++ b/pkg/tasks/queue/pgqueue.go @@ -24,7 +24,7 @@ import ( "github.com/rs/zerolog/log" ) -const taskInfoReturning = ` id, type, payload, queued_at, started_at, finished_at, status, error, org_id, object_uuid, object_type, token, request_id, retries, next_retry_time, priority ` // fields to return when returning taskInfo +const taskInfoReturning = ` id, type, payload, queued_at, started_at, finished_at, status, error, org_id, object_uuid, object_type, token, request_id, retries, next_retry_time, priority, cancel_attempted ` // fields to return when returning taskInfo const ( sqlNotify = `NOTIFY tasks` @@ -120,6 +120,10 @@ const ( WHERE id = $1` sqlDeleteAllTasks = ` TRUNCATE task_heartbeats, task_dependencies; DELETE FROM TASKS;` + sqlSetCancelAttempted = ` + UPDATE tasks + SET cancel_attempted = true + WHERE id = $1` ) // These interfaces represent all the interactions with pgxpool that are needed for the pgqueue @@ -409,7 +413,7 @@ func (p *PgQueue) dequeueMaybe(ctx context.Context, token uuid.UUID, taskTypes [ err = tx.QueryRow(ctx, sqlDequeue, token, taskTypes).Scan( &info.Id, &info.Typename, &info.Payload, &info.Queued, &info.Started, &info.Finished, &info.Status, &info.Error, &info.OrgId, &info.ObjectUUID, &info.ObjectType, &info.Token, &info.RequestID, - &info.Retries, &info.NextRetryTime, &info.Priority, + &info.Retries, &info.NextRetryTime, &info.Priority, &info.CancelAttempted, ) if err != nil { return nil, fmt.Errorf("error during dequeue query: %w", err) @@ -473,7 +477,7 @@ func (p *PgQueue) Status(taskId uuid.UUID) (*models.TaskInfo, error) { err = conn.QueryRow(context.Background(), sqlQueryTaskStatus, taskId).Scan( &info.Id, &info.Typename, &info.Payload, &info.Queued, &info.Started, &info.Finished, &info.Status, &info.Error, &info.OrgId, &info.ObjectUUID, &info.ObjectType, &info.Token, &info.RequestID, - &info.Retries, &info.NextRetryTime, &info.Priority, + &info.Retries, &info.NextRetryTime, &info.Priority, &info.CancelAttempted, ) if err != nil { return nil, err @@ -669,6 +673,9 @@ func (p *PgQueue) Requeue(taskId uuid.UUID) error { if err == pgx.ErrNoRows { return ErrNotExist } + if info.CancelAttempted { + return ErrTaskCanceled + } if info.Started == nil || info.Finished != nil { return ErrNotRunning } @@ -877,6 +884,10 @@ func (p *PgQueue) ListenForCancel(ctx context.Context, taskID uuid.UUID, cancelF // Cancel context only if context has not already been canceled. If the context has already been canceled, the task has finished. if !errors.Is(ErrNotRunning, context.Cause(ctx)) { + if err := p.setCancelAttempted(taskID); err != nil { + logger.Error().Err(err).Msg("ListenForCancel: error setting cancel_attempted") + return + } logger.Debug().Msg("[Canceled Task]") cancelFunc(ErrTaskCanceled) } @@ -889,3 +900,8 @@ func isContextCancelled(ctx context.Context) bool { func getCancelChannelName(taskID uuid.UUID) string { return strings.Replace("task_"+taskID.String(), "-", "", -1) } + +func (p *PgQueue) setCancelAttempted(taskID uuid.UUID) error { + _, err := p.Pool.Exec(context.Background(), sqlSetCancelAttempted, taskID) + return err +} diff --git a/pkg/tasks/queue/pgqueue_test.go b/pkg/tasks/queue/pgqueue_test.go index 2e26189aa..febd4017d 100644 --- a/pkg/tasks/queue/pgqueue_test.go +++ b/pkg/tasks/queue/pgqueue_test.go @@ -315,6 +315,26 @@ func (s *QueueSuite) TestRequeueFailedTasks() { assert.True(s.T(), info.Queued.After(*originalQueueTime)) } +func (s *QueueSuite) TestCannotRequeueCanceledTasks() { + id, err := s.queue.Enqueue(&testTask) + require.NoError(s.T(), err) + assert.NotEqual(s.T(), uuid.Nil, id) + + _, err = s.queue.Status(id) + require.NoError(s.T(), err) + + _, err = s.queue.Dequeue(context.Background(), []string{testTaskType}) + require.NoError(s.T(), err) + + err = s.queue.Cancel(context.Background(), id) + require.NoError(s.T(), err) + err = s.queue.setCancelAttempted(id) + require.NoError(s.T(), err) + + err = s.queue.Requeue(id) + assert.ErrorIs(s.T(), err, ErrTaskCanceled) +} + func (s *QueueSuite) TestRequeueFailedTasksExceedRetries() { config.Get().Tasking.RetryWaitUpperBound = 0