Skip to content

Commit

Permalink
Merge pull request #21 from pfnet-research/add-processing-in-get-tasks
Browse files Browse the repository at this point in the history
add 'processing' filter in get-task command
  • Loading branch information
everpeace authored Aug 6, 2020
2 parents 926adbd + b3c2cb9 commit 6d048aa
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 1 deletion.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
- [Managing configurations](#managing-configurations)
* [Command line flags](#command-line-flags)
* [Eenvironment variables](#eenvironment-variables)
* [Config files](#config-files)
* [Config file](#config-file)
- [Backend configuration reference](#backend-configuration-reference)
* [Redis](#redis)
- [Bash/Zsh completion](#bashzsh-completion)
Expand Down
3 changes: 3 additions & 0 deletions cmd/get_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var (
possibleTaskState = []string{
"all",
"pending",
"processing",
"completed",
"succeeded",
"failed",
Expand Down Expand Up @@ -71,6 +72,8 @@ var getTaskCmd = &cobra.Command{
ts, err = queueBackend.GetAllTasks(cmdContext, queueName)
case "pending":
ts, err = queueBackend.GetPendingTasks(cmdContext, queueName)
case "processing":
ts, err = queueBackend.GetProcessingTasks(cmdContext, queueName)
case "completed":
ts, err = queueBackend.GetCompletedTasks(cmdContext, queueName)
case "succeeded":
Expand Down
1 change: 1 addition & 0 deletions pkg/backend/iface/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type Backend interface {
AddTask(ctx context.Context, queueName string, spec task.TaskSpec) (*task.Task, error)
NextTask(ctx context.Context, queueUID, workerUID uuid.UUID) (*task.Task, error)
GetAllTasks(ctx context.Context, queueName string) ([]*task.Task, error)
GetProcessingTasks(ctx context.Context, queueName string) ([]*task.Task, error)
GetPendingTasks(ctx context.Context, queueName string) ([]*task.Task, error)
GetReceivedTasks(ctx context.Context, queueName string) ([]*task.Task, error)
GetCompletedTasks(ctx context.Context, queueName string) ([]*task.Task, error)
Expand Down
21 changes: 21 additions & 0 deletions pkg/backend/redis/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,27 @@ func (b *Backend) GetReceivedTasks(ctx context.Context, queueName string) ([]*ta
)
}

func (b *Backend) GetProcessingTasks(ctx context.Context, queueName string) ([]*task.Task, error) {
if err := taskqueue.ValidateQueueName(queueName); err != nil {
return nil, err
}
queue, err := b.ensureQueueExistsByName(b.Client, queueName)
if err != nil {
return nil, err
}
return b.getTasks(
queue.UID.String(),
func(t *task.Task) bool {
return t.Status.Phase == task.TaskPhaseProcessing
},
b.Logger.With().
Str("queueName", queue.Spec.Name).
Str("queueUID", queue.UID.String()).
Str("operation", "GetProcessingTasks").
Logger(),
)
}

func (b *Backend) GetCompletedTasks(ctx context.Context, queueName string) ([]*task.Task, error) {
if err := taskqueue.ValidateQueueName(queueName); err != nil {
return nil, err
Expand Down

0 comments on commit 6d048aa

Please sign in to comment.