From b3c2cb9bfbc6c362362008d2b6e20f59ca766f34 Mon Sep 17 00:00:00 2001 From: Shingo Omura Date: Thu, 6 Aug 2020 23:56:19 +0900 Subject: [PATCH] add 'processing' filter in get-task command --- README.md | 2 +- cmd/get_task.go | 3 +++ pkg/backend/iface/backend.go | 1 + pkg/backend/redis/task.go | 21 +++++++++++++++++++++ 4 files changed, 26 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 131709f..8ca57ea 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,7 @@ - [Managing configurations](#managing-configurations) * [Command line flags](#command-line-flags) * [Eenvironment variables](#eenvironment-variables) - * [Config files](#config-files) + * [Config file](#config-file) - [Backend configuration reference](#backend-configuration-reference) * [Redis](#redis) - [Bash/Zsh completion](#bashzsh-completion) diff --git a/cmd/get_task.go b/cmd/get_task.go index 1d53f4b..3112223 100644 --- a/cmd/get_task.go +++ b/cmd/get_task.go @@ -33,6 +33,7 @@ var ( possibleTaskState = []string{ "all", "pending", + "processing", "completed", "succeeded", "failed", @@ -71,6 +72,8 @@ var getTaskCmd = &cobra.Command{ ts, err = queueBackend.GetAllTasks(cmdContext, queueName) case "pending": ts, err = queueBackend.GetPendingTasks(cmdContext, queueName) + case "processing": + ts, err = queueBackend.GetProcessingTasks(cmdContext, queueName) case "completed": ts, err = queueBackend.GetCompletedTasks(cmdContext, queueName) case "succeeded": diff --git a/pkg/backend/iface/backend.go b/pkg/backend/iface/backend.go index 52b986d..69b9a6f 100644 --- a/pkg/backend/iface/backend.go +++ b/pkg/backend/iface/backend.go @@ -61,6 +61,7 @@ type Backend interface { AddTask(ctx context.Context, queueName string, spec task.TaskSpec) (*task.Task, error) NextTask(ctx context.Context, queueUID, workerUID uuid.UUID) (*task.Task, error) GetAllTasks(ctx context.Context, queueName string) ([]*task.Task, error) + GetProcessingTasks(ctx context.Context, queueName string) ([]*task.Task, error) GetPendingTasks(ctx context.Context, queueName string) ([]*task.Task, error) GetReceivedTasks(ctx context.Context, queueName string) ([]*task.Task, error) GetCompletedTasks(ctx context.Context, queueName string) ([]*task.Task, error) diff --git a/pkg/backend/redis/task.go b/pkg/backend/redis/task.go index 3180427..92d845f 100644 --- a/pkg/backend/redis/task.go +++ b/pkg/backend/redis/task.go @@ -225,6 +225,27 @@ func (b *Backend) GetReceivedTasks(ctx context.Context, queueName string) ([]*ta ) } +func (b *Backend) GetProcessingTasks(ctx context.Context, queueName string) ([]*task.Task, error) { + if err := taskqueue.ValidateQueueName(queueName); err != nil { + return nil, err + } + queue, err := b.ensureQueueExistsByName(b.Client, queueName) + if err != nil { + return nil, err + } + return b.getTasks( + queue.UID.String(), + func(t *task.Task) bool { + return t.Status.Phase == task.TaskPhaseProcessing + }, + b.Logger.With(). + Str("queueName", queue.Spec.Name). + Str("queueUID", queue.UID.String()). + Str("operation", "GetProcessingTasks"). + Logger(), + ) +} + func (b *Backend) GetCompletedTasks(ctx context.Context, queueName string) ([]*task.Task, error) { if err := taskqueue.ValidateQueueName(queueName); err != nil { return nil, err