Skip to content

Commit

Permalink
Merge branch 'master' into more-rl-frac
Browse files Browse the repository at this point in the history
  • Loading branch information
robertdavidsmith authored Nov 5, 2024
2 parents bae8777 + 7fe5217 commit 7855d8c
Show file tree
Hide file tree
Showing 20 changed files with 2,140 additions and 146 deletions.
43 changes: 43 additions & 0 deletions cmd/armadactl/cmd/cancel.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package cmd

import (
"fmt"

"github.com/spf13/cobra"

"github.com/armadaproject/armada/internal/armadactl"
Expand All @@ -16,6 +18,7 @@ func cancelCmd() *cobra.Command {
cmd.AddCommand(
cancelJobCmd(),
cancelJobSetCmd(),
cancelExecutorCmd(),
)
return cmd
}
Expand Down Expand Up @@ -58,3 +61,43 @@ func cancelJobSetCmd() *cobra.Command {
}
return cmd
}

func cancelExecutorCmd() *cobra.Command {
a := armadactl.New()
cmd := &cobra.Command{
Use: "executor <executor>",
Short: "Cancels jobs on executor.",
Long: `Cancels jobs on executor with provided executor name, priority classes and queues.`,
Args: cobra.ExactArgs(1),
PreRunE: func(cmd *cobra.Command, args []string) error {
if err := cmd.MarkFlagRequired("priority-classes"); err != nil {
return fmt.Errorf("error marking priority-class flag as required: %s", err)
}
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, args []string) error {
onExecutor := args[0]

priorityClasses, err := cmd.Flags().GetStringSlice("priority-classes")
if err != nil {
return fmt.Errorf("error reading priority-class selection: %s", err)
}

queues, err := cmd.Flags().GetStringSlice("queues")
if err != nil {
return fmt.Errorf("error reading queue selection: %s", err)
}

return a.CancelOnExecutor(onExecutor, queues, priorityClasses)
},
}

cmd.Flags().StringSliceP(
"queues",
"q",
[]string{},
"Cancel jobs on executor matching the specified queue names. If no queues are provided, jobs across all queues will be cancelled. Provided queues should be comma separated, as in the following example: queueA,queueB,queueC.",
)
cmd.Flags().StringSliceP("priority-classes", "p", []string{}, "Cancel jobs on executor matching the specified priority classes. Provided priority classes should be comma separated, as in the following example: armada-default,armada-preemptible.")
return cmd
}
3 changes: 3 additions & 0 deletions cmd/armadactl/cmd/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,8 @@ func initParams(cmd *cobra.Command, params *armadactl.Params) error {
params.ExecutorAPI.Cordon = ce.CordonExecutor(client.ExtractCommandlineArmadaApiConnectionDetails)
params.ExecutorAPI.Uncordon = ce.UncordonExecutor(client.ExtractCommandlineArmadaApiConnectionDetails)

params.ExecutorAPI.CancelOnExecutor = ce.CancelOnExecutor(client.ExtractCommandlineArmadaApiConnectionDetails)
params.ExecutorAPI.PreemptOnExecutor = ce.PreemptOnExecutor(client.ExtractCommandlineArmadaApiConnectionDetails)

return nil
}
46 changes: 45 additions & 1 deletion cmd/armadactl/cmd/preempt.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package cmd

import (
"fmt"

"github.com/spf13/cobra"

"github.com/armadaproject/armada/internal/armadactl"
Expand All @@ -12,7 +14,10 @@ func preemptCmd() *cobra.Command {
Short: "Preempt jobs in armada.",
Args: cobra.ExactArgs(0),
}
cmd.AddCommand(preemptJobCmd())
cmd.AddCommand(
preemptJobCmd(),
preemptExecutorCmd(),
)
return cmd
}

Expand All @@ -35,3 +40,42 @@ func preemptJobCmd() *cobra.Command {
}
return cmd
}

func preemptExecutorCmd() *cobra.Command {
a := armadactl.New()
cmd := &cobra.Command{
Use: "executor <executor>",
Short: "Preempts jobs on executor.",
Long: `Preempts jobs on executor with provided executor name, priority classes and queues.`,
Args: cobra.ExactArgs(1),
PreRunE: func(cmd *cobra.Command, args []string) error {
if err := cmd.MarkFlagRequired("priority-classes"); err != nil {
return fmt.Errorf("error marking priority-class flag as required: %s", err)
}
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, args []string) error {
onExecutor := args[0]

priorityClasses, err := cmd.Flags().GetStringSlice("priority-classes")
if err != nil {
return fmt.Errorf("error reading priority-class selection: %s", err)
}

queues, err := cmd.Flags().GetStringSlice("queues")
if err != nil {
return fmt.Errorf("error reading queue selection: %s", err)
}

return a.PreemptOnExecutor(onExecutor, queues, priorityClasses)
},
}
cmd.Flags().StringSliceP(
"queues",
"q",
[]string{},
"Preempt jobs on executor matching the specified queue names. If no queues are provided, jobs across all queues will be preempted. Provided queues should be comma separated, as in the following example: queueA,queueB,queueC.",
)
cmd.Flags().StringSliceP("priority-classes", "p", []string{}, "Preempt jobs on executor matching the specified priority classes. Provided priority classes should be comma separated, as in the following example: armada-default,armada-preemptible.")
return cmd
}
1 change: 1 addition & 0 deletions developer/config/insecure-armada.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ auth:
cordon_queue: ["everyone"]
cancel_any_jobs: ["everyone"]
reprioritize_any_jobs: ["everyone"]
preempt_any_jobs: ["everyone"]
watch_all_events: ["everyone"]
execute_jobs: ["everyone"]
update_executor_settings: ["everyone"]
6 changes: 4 additions & 2 deletions internal/armadactl/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ type QueueAPI struct {
}

type ExecutorAPI struct {
Cordon executor.CordonAPI
Uncordon executor.UncordonAPI
Cordon executor.CordonAPI
Uncordon executor.UncordonAPI
CancelOnExecutor executor.CancelAPI
PreemptOnExecutor executor.PreemptAPI
}

// New instantiates an App with default parameters, including standard output
Expand Down
20 changes: 20 additions & 0 deletions internal/armadactl/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/pkg/errors"

"github.com/armadaproject/armada/internal/common"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/pkg/api"
"github.com/armadaproject/armada/pkg/client"
)
Expand Down Expand Up @@ -53,3 +54,22 @@ func (a *App) CancelJobSet(queue string, jobSetId string) (outerErr error) {
return nil
})
}

func (a *App) CancelOnExecutor(executor string, queues []string, priorityClasses []string) error {
queueMsg := strings.Join(queues, ",")
priorityClassesMsg := strings.Join(priorityClasses, ",")
// If the provided slice of queues is empty, jobs on all queues will be cancelled
if len(queues) == 0 {
apiQueues, err := a.getAllQueuesAsAPIQueue(&QueueQueryArgs{})
if err != nil {
return fmt.Errorf("error cancelling jobs on executor %s: %s", executor, err)
}
queues = armadaslices.Map(apiQueues, func(q *api.Queue) string { return q.Name })
queueMsg = "all"
}
fmt.Fprintf(a.Out, "Requesting cancellation of jobs matching executor: %s, queues: %s, priority-classes: %s\n", executor, queueMsg, priorityClassesMsg)
if err := a.Params.ExecutorAPI.CancelOnExecutor(executor, queues, priorityClasses); err != nil {
return fmt.Errorf("error cancelling jobs on executor %s: %s", executor, err)
}
return nil
}
22 changes: 22 additions & 0 deletions internal/armadactl/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package armadactl

import (
"fmt"
"strings"

"github.com/pkg/errors"

"github.com/armadaproject/armada/internal/common"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/pkg/api"
"github.com/armadaproject/armada/pkg/client"
)
Expand All @@ -32,3 +34,23 @@ func (a *App) Preempt(queue string, jobSetId string, jobId string) (outerErr err
return nil
})
}

func (a *App) PreemptOnExecutor(executor string, queues []string, priorityClasses []string) error {
queueMsg := strings.Join(queues, ",")
priorityClassesMsg := strings.Join(priorityClasses, ",")
// If the provided slice of queues is empty, jobs on all queues will be cancelled
if len(queues) == 0 {
apiQueues, err := a.getAllQueuesAsAPIQueue(&QueueQueryArgs{})
if err != nil {
return fmt.Errorf("error preempting jobs on executor %s: %s", executor, err)
}
queues = armadaslices.Map(apiQueues, func(q *api.Queue) string { return q.Name })
queueMsg = "all"
}

fmt.Fprintf(a.Out, "Requesting preemption of jobs matching executor: %s, queues: %s, priority-classes: %s\n", executor, queueMsg, priorityClassesMsg)
if err := a.Params.ExecutorAPI.PreemptOnExecutor(executor, queues, priorityClasses); err != nil {
return fmt.Errorf("error preempting jobs on executor %s: %s", executor, err)
}
return nil
}
20 changes: 20 additions & 0 deletions internal/common/ingest/testfixtures/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,26 @@ var DeleteExecutorSettings = &controlplaneevents.Event{
},
}

var PreemptOnExecutor = &controlplaneevents.Event{
Event: &controlplaneevents.Event_PreemptOnExecutor{
PreemptOnExecutor: &controlplaneevents.PreemptOnExecutor{
Name: ExecutorId,
Queues: []string{Queue},
PriorityClasses: []string{PriorityClassName},
},
},
}

var CancelOnExecutor = &controlplaneevents.Event{
Event: &controlplaneevents.Event_CancelOnExecutor{
CancelOnExecutor: &controlplaneevents.CancelOnExecutor{
Name: ExecutorId,
Queues: []string{Queue},
PriorityClasses: []string{PriorityClassName},
},
},
}

func JobSetCancelRequestedWithStateFilter(states ...armadaevents.JobState) *armadaevents.EventSequence_Event {
return &armadaevents.EventSequence_Event{
Created: testfixtures.BasetimeProto,
Expand Down
Loading

0 comments on commit 7855d8c

Please sign in to comment.