Skip to content

Commit

Permalink
Merge master
Browse files Browse the repository at this point in the history
Signed-off-by: JamesMurkin <[email protected]>
  • Loading branch information
JamesMurkin committed Nov 5, 2024
2 parents edd7614 + 0f87b66 commit a2891ea
Show file tree
Hide file tree
Showing 54 changed files with 4,169 additions and 246 deletions.
16 changes: 2 additions & 14 deletions .run/Armada.run.xml
Original file line number Diff line number Diff line change
@@ -1,24 +1,12 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Armada" type="CompoundRunConfigurationType">
<toRun name="Armada Infrastructure Services" type="docker-deploy" />
<toRun name="Event Ingester" type="GoApplicationRunConfiguration" />
<toRun name="Lookout Ingester V2" type="GoApplicationRunConfiguration" />
<toRun name="LookoutV2" type="GoApplicationRunConfiguration" />
<toRun name="Executor" type="GoApplicationRunConfiguration" />
<toRun name="Server" type="GoApplicationRunConfiguration" />
<toRun name="Scheduler" type="GoApplicationRunConfiguration" />
<toRun name="Scheduler Ingester" type="GoApplicationRunConfiguration" />
<toRun name="schedulerPostgresMigration" type="GoApplicationRunConfiguration" />
<method v="2" />
</configuration>
<configuration default="false" name="Armada (Pulsar Scheduler)" type="CompoundRunConfigurationType">
<toRun name="Event Ingester" type="GoApplicationRunConfiguration" />
<toRun name="Lookout Ingester V2" type="GoApplicationRunConfiguration" />
<toRun name="LookoutV2" type="GoApplicationRunConfiguration" />
<toRun name="Executor" type="GoApplicationRunConfiguration" />
<toRun name="Server" type="GoApplicationRunConfiguration" />
<toRun name="Scheduler" type="GoApplicationRunConfiguration" />
<toRun name="Scheduler Ingester" type="GoApplicationRunConfiguration" />
<toRun name="Server" type="GoApplicationRunConfiguration" />
<method v="2" />
</configuration>
</component>
</component>
43 changes: 43 additions & 0 deletions cmd/armadactl/cmd/cancel.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package cmd

import (
"fmt"

"github.com/spf13/cobra"

"github.com/armadaproject/armada/internal/armadactl"
Expand All @@ -16,6 +18,7 @@ func cancelCmd() *cobra.Command {
cmd.AddCommand(
cancelJobCmd(),
cancelJobSetCmd(),
cancelExecutorCmd(),
)
return cmd
}
Expand Down Expand Up @@ -58,3 +61,43 @@ func cancelJobSetCmd() *cobra.Command {
}
return cmd
}

func cancelExecutorCmd() *cobra.Command {
a := armadactl.New()
cmd := &cobra.Command{
Use: "executor <executor>",
Short: "Cancels jobs on executor.",
Long: `Cancels jobs on executor with provided executor name, priority classes and queues.`,
Args: cobra.ExactArgs(1),
PreRunE: func(cmd *cobra.Command, args []string) error {
if err := cmd.MarkFlagRequired("priority-classes"); err != nil {
return fmt.Errorf("error marking priority-class flag as required: %s", err)
}
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, args []string) error {
onExecutor := args[0]

priorityClasses, err := cmd.Flags().GetStringSlice("priority-classes")
if err != nil {
return fmt.Errorf("error reading priority-class selection: %s", err)
}

queues, err := cmd.Flags().GetStringSlice("queues")
if err != nil {
return fmt.Errorf("error reading queue selection: %s", err)
}

return a.CancelOnExecutor(onExecutor, queues, priorityClasses)
},
}

cmd.Flags().StringSliceP(
"queues",
"q",
[]string{},
"Cancel jobs on executor matching the specified queue names. If no queues are provided, jobs across all queues will be cancelled. Provided queues should be comma separated, as in the following example: queueA,queueB,queueC.",
)
cmd.Flags().StringSliceP("priority-classes", "p", []string{}, "Cancel jobs on executor matching the specified priority classes. Provided priority classes should be comma separated, as in the following example: armada-default,armada-preemptible.")
return cmd
}
3 changes: 3 additions & 0 deletions cmd/armadactl/cmd/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,8 @@ func initParams(cmd *cobra.Command, params *armadactl.Params) error {
params.ExecutorAPI.Cordon = ce.CordonExecutor(client.ExtractCommandlineArmadaApiConnectionDetails)
params.ExecutorAPI.Uncordon = ce.UncordonExecutor(client.ExtractCommandlineArmadaApiConnectionDetails)

params.ExecutorAPI.CancelOnExecutor = ce.CancelOnExecutor(client.ExtractCommandlineArmadaApiConnectionDetails)
params.ExecutorAPI.PreemptOnExecutor = ce.PreemptOnExecutor(client.ExtractCommandlineArmadaApiConnectionDetails)

return nil
}
55 changes: 46 additions & 9 deletions cmd/armadactl/cmd/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,69 @@ func preemptCmd() *cobra.Command {
Short: "Preempt jobs in armada.",
Args: cobra.ExactArgs(0),
}
cmd.Flags().String("reason", "", "Reason for preemption")
cmd.AddCommand(preemptJobCmd())
cmd.AddCommand(
preemptJobCmd(),
preemptExecutorCmd(),
)
return cmd
}

func preemptJobCmd() *cobra.Command {
a := armadactl.New()
cmd := &cobra.Command{
Use: "job <queue> <job-set> <job-id>",
Use: "job <queue> <job-set> <job-id> <preempt-reason>",
Short: "Preempt an armada job.",
Long: `Preempt a job by providing it's queue, jobset and jobId.`,
Args: cobra.ExactArgs(3),
Long: `Preempt a job by providing it's queue, jobset, jobId and a preemption reason.`,
Args: cobra.ExactArgs(4),
PreRunE: func(cmd *cobra.Command, args []string) error {
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, args []string) error {
queue := args[0]
jobSetId := args[1]
jobId := args[2]
reason, err := cmd.Flags().GetString("reason")
reason := args[3]
return a.Preempt(queue, jobSetId, jobId, reason)
},
}
return cmd
}

func preemptExecutorCmd() *cobra.Command {
a := armadactl.New()
cmd := &cobra.Command{
Use: "executor <executor>",
Short: "Preempts jobs on executor.",
Long: `Preempts jobs on executor with provided executor name, priority classes and queues.`,
Args: cobra.ExactArgs(1),
PreRunE: func(cmd *cobra.Command, args []string) error {
if err := cmd.MarkFlagRequired("priority-classes"); err != nil {
return fmt.Errorf("error marking priority-class flag as required: %s", err)
}
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, args []string) error {
onExecutor := args[0]

priorityClasses, err := cmd.Flags().GetStringSlice("priority-classes")
if err != nil {
return fmt.Errorf("error reading reason: %s", err)
return fmt.Errorf("error reading priority-class selection: %s", err)
}
return a.Preempt(queue, jobSetId, jobId, reason)

queues, err := cmd.Flags().GetStringSlice("queues")
if err != nil {
return fmt.Errorf("error reading queue selection: %s", err)
}

return a.PreemptOnExecutor(onExecutor, queues, priorityClasses)
},
}
cmd.Flags().String("reason", "", "Reason for preemption")
cmd.Flags().StringSliceP(
"queues",
"q",
[]string{},
"Preempt jobs on executor matching the specified queue names. If no queues are provided, jobs across all queues will be preempted. Provided queues should be comma separated, as in the following example: queueA,queueB,queueC.",
)
cmd.Flags().StringSliceP("priority-classes", "p", []string{}, "Preempt jobs on executor matching the specified priority classes. Provided priority classes should be comma separated, as in the following example: armada-default,armada-preemptible.")
return cmd
}
91 changes: 91 additions & 0 deletions cmd/lookoutingesterv2/dbloadtester/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package main

import (
"fmt"
"time"

log "github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"sigs.k8s.io/yaml"

"github.com/armadaproject/armada/internal/common"
"github.com/armadaproject/armada/internal/common/app"
"github.com/armadaproject/armada/internal/lookoutingesterv2/configuration"
"github.com/armadaproject/armada/internal/lookoutingesterv2/dbloadtester"
)

func init() {
pflag.StringSlice(
"lookoutIngesterConfig",
[]string{},
"Fully qualified path to application configuration file (for multiple config files repeat this arg or separate paths with commas)",
)
pflag.Parse()
}

const ReportTemplate string = `
Load Test on LookoutIngester at %s
Configuration:
Total Jobs Simulated: %d
Total Concurrent Jobs Simulated: %d
Maximum Batch of Jobs Per Queue: %d
Queues in Use: %s
LookoutIngester Config:
%s
Results:
Total Load Test Duration: %s
Total DB Insertion Duration: %s
Number of Events Processed: %d
Average DB Insertion Time Per Event: %f milliseconds
Events Processed By DB Per Second: %f events
`

func main() {
common.ConfigureLogging()
common.BindCommandlineArguments()

var config configuration.LookoutIngesterV2Configuration
userSpecifiedConfigs := viper.GetStringSlice("lookoutIngesterConfig")
common.LoadConfig(&config, "./config/lookoutingesterv2", userSpecifiedConfigs)

loadtesterConfig := dbloadtester.Config{
TotalJobs: 500000,
TotalConcurrentJobs: 50000,
QueueSubmitBatchSize: 300,
QueueNames: []string{"queue1", "queue2", "queue3"},
JobTemplateFile: "internal/lookoutingesterv2/dbloadtester/test_data.yaml",
}

loadtester := dbloadtester.Setup(
config,
loadtesterConfig,
)

results, err := loadtester.Run(app.CreateContextWithShutdown())
if err != nil {
log.Errorf("Ingestion simulator failed: %v", err)
}

LIConfig, err := yaml.Marshal(config)
if err != nil {
log.Warn("Failed to marshal lookout ingester config for report output")
}
fmt.Printf(
ReportTemplate,
time.Now().Format("2006-01-02"),
loadtesterConfig.TotalJobs,
loadtesterConfig.TotalConcurrentJobs,
loadtesterConfig.QueueSubmitBatchSize,
loadtesterConfig.QueueNames,
string(LIConfig),
results.TotalTestDuration,
results.TotalDBInsertionDuration,
results.TotalEventsProcessed,
float64(results.TotalDBInsertionDuration.Milliseconds())/float64(results.TotalEventsProcessed),
float64(results.TotalEventsProcessed)/float64(results.TotalDBInsertionDuration.Seconds()),
)
}
1 change: 1 addition & 0 deletions developer/config/insecure-armada.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ auth:
cordon_queue: ["everyone"]
cancel_any_jobs: ["everyone"]
reprioritize_any_jobs: ["everyone"]
preempt_any_jobs: ["everyone"]
watch_all_events: ["everyone"]
execute_jobs: ["everyone"]
update_executor_settings: ["everyone"]
6 changes: 4 additions & 2 deletions internal/armadactl/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ type QueueAPI struct {
}

type ExecutorAPI struct {
Cordon executor.CordonAPI
Uncordon executor.UncordonAPI
Cordon executor.CordonAPI
Uncordon executor.UncordonAPI
CancelOnExecutor executor.CancelAPI
PreemptOnExecutor executor.PreemptAPI
}

// New instantiates an App with default parameters, including standard output
Expand Down
20 changes: 20 additions & 0 deletions internal/armadactl/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/pkg/errors"

"github.com/armadaproject/armada/internal/common"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/pkg/api"
"github.com/armadaproject/armada/pkg/client"
)
Expand Down Expand Up @@ -53,3 +54,22 @@ func (a *App) CancelJobSet(queue string, jobSetId string) (outerErr error) {
return nil
})
}

func (a *App) CancelOnExecutor(executor string, queues []string, priorityClasses []string) error {
queueMsg := strings.Join(queues, ",")
priorityClassesMsg := strings.Join(priorityClasses, ",")
// If the provided slice of queues is empty, jobs on all queues will be cancelled
if len(queues) == 0 {
apiQueues, err := a.getAllQueuesAsAPIQueue(&QueueQueryArgs{})
if err != nil {
return fmt.Errorf("error cancelling jobs on executor %s: %s", executor, err)
}
queues = armadaslices.Map(apiQueues, func(q *api.Queue) string { return q.Name })
queueMsg = "all"
}
fmt.Fprintf(a.Out, "Requesting cancellation of jobs matching executor: %s, queues: %s, priority-classes: %s\n", executor, queueMsg, priorityClassesMsg)
if err := a.Params.ExecutorAPI.CancelOnExecutor(executor, queues, priorityClasses); err != nil {
return fmt.Errorf("error cancelling jobs on executor %s: %s", executor, err)
}
return nil
}
22 changes: 22 additions & 0 deletions internal/armadactl/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package armadactl

import (
"fmt"
"strings"

"github.com/pkg/errors"

"github.com/armadaproject/armada/internal/common"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/pkg/api"
"github.com/armadaproject/armada/pkg/client"
)
Expand Down Expand Up @@ -33,3 +35,23 @@ func (a *App) Preempt(queue string, jobSetId string, jobId string, reason string
return nil
})
}

func (a *App) PreemptOnExecutor(executor string, queues []string, priorityClasses []string) error {
queueMsg := strings.Join(queues, ",")
priorityClassesMsg := strings.Join(priorityClasses, ",")
// If the provided slice of queues is empty, jobs on all queues will be cancelled
if len(queues) == 0 {
apiQueues, err := a.getAllQueuesAsAPIQueue(&QueueQueryArgs{})
if err != nil {
return fmt.Errorf("error preempting jobs on executor %s: %s", executor, err)
}
queues = armadaslices.Map(apiQueues, func(q *api.Queue) string { return q.Name })
queueMsg = "all"
}

fmt.Fprintf(a.Out, "Requesting preemption of jobs matching executor: %s, queues: %s, priority-classes: %s\n", executor, queueMsg, priorityClassesMsg)
if err := a.Params.ExecutorAPI.PreemptOnExecutor(executor, queues, priorityClasses); err != nil {
return fmt.Errorf("error preempting jobs on executor %s: %s", executor, err)
}
return nil
}
3 changes: 3 additions & 0 deletions internal/common/ingest/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ func (b *Batcher[T]) Run(ctx *armadacontext.Context) {
case value, ok := <-b.input:
if !ok {
// input channel has closed
if totalNumberOfItems > 0 {
b.publish <- b.buffer
}
return
}
b.mutex.Lock()
Expand Down
20 changes: 20 additions & 0 deletions internal/common/ingest/testfixtures/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,26 @@ var DeleteExecutorSettings = &controlplaneevents.Event{
},
}

var PreemptOnExecutor = &controlplaneevents.Event{
Event: &controlplaneevents.Event_PreemptOnExecutor{
PreemptOnExecutor: &controlplaneevents.PreemptOnExecutor{
Name: ExecutorId,
Queues: []string{Queue},
PriorityClasses: []string{PriorityClassName},
},
},
}

var CancelOnExecutor = &controlplaneevents.Event{
Event: &controlplaneevents.Event_CancelOnExecutor{
CancelOnExecutor: &controlplaneevents.CancelOnExecutor{
Name: ExecutorId,
Queues: []string{Queue},
PriorityClasses: []string{PriorityClassName},
},
},
}

func JobSetCancelRequestedWithStateFilter(states ...armadaevents.JobState) *armadaevents.EventSequence_Event {
return &armadaevents.EventSequence_Event{
Created: testfixtures.BasetimeProto,
Expand Down
Loading

0 comments on commit a2891ea

Please sign in to comment.