Skip to content

Commit

Permalink
Merging master into branch
Browse files Browse the repository at this point in the history
  • Loading branch information
mustafai-gr committed Nov 9, 2024
2 parents afce23a + 43f8573 commit 7b9ff1d
Show file tree
Hide file tree
Showing 55 changed files with 2,048 additions and 1,286 deletions.
9 changes: 5 additions & 4 deletions cmd/armadactl/cmd/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,19 @@ func preemptCmd() *cobra.Command {
func preemptJobCmd() *cobra.Command {
a := armadactl.New()
cmd := &cobra.Command{
Use: "job <queue> <job-set> <job-id>",
Use: "job <queue> <job-set> <job-id> <preempt-reason>",
Short: "Preempt an armada job.",
Long: `Preempt a job by providing it's queue, jobset and jobId.`,
Args: cobra.ExactArgs(3),
Long: `Preempt a job by providing it's queue, jobset, jobId and a preemption reason.`,
Args: cobra.ExactArgs(4),
PreRunE: func(cmd *cobra.Command, args []string) error {
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, args []string) error {
queue := args[0]
jobSetId := args[1]
jobId := args[2]
return a.Preempt(queue, jobSetId, jobId)
reason := args[3]
return a.Preempt(queue, jobSetId, jobId, reason)
},
}
return cmd
Expand Down
3 changes: 2 additions & 1 deletion internal/armadactl/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

// Preempt a job.
func (a *App) Preempt(queue string, jobSetId string, jobId string) (outerErr error) {
func (a *App) Preempt(queue string, jobSetId string, jobId string, reason string) (outerErr error) {
apiConnectionDetails := a.Params.ApiConnectionDetails

fmt.Fprintf(a.Out, "Requesting preemption of job matching queue: %s, job set: %s, and job Id: %s\n", queue, jobSetId, jobId)
Expand All @@ -25,6 +25,7 @@ func (a *App) Preempt(queue string, jobSetId string, jobId string) (outerErr err
JobIds: []string{jobId},
JobSetId: jobSetId,
Queue: queue,
Reason: reason,
})
if err != nil {
return errors.Wrapf(err, "error preempting job matching queue: %s, job set: %s, and job id: %s", queue, jobSetId, jobId)
Expand Down
2 changes: 2 additions & 0 deletions internal/common/ingest/testfixtures/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
DebugMsg = "sample debug message"
LeaseReturnedMsg = "lease returned error message"
UnschedulableMsg = "test pod is unschedulable"
PreemptionReason = "job preempted"
PartitionMarkerPartitionId = 456

ExecutorCordonReason = "bad executor"
Expand Down Expand Up @@ -367,6 +368,7 @@ var JobRunPreempted = &armadaevents.EventSequence_Event{
JobRunPreempted: &armadaevents.JobRunPreempted{
PreemptedJobId: JobId,
PreemptedRunId: RunId,
Reason: PreemptionReason,
},
},
}
Expand Down
2 changes: 1 addition & 1 deletion internal/lookoutingesterv2/instructions/instructions.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func (c *InstructionConverter) handleJobRunPreempted(ts time.Time, event *armada
RunId: event.PreemptedRunId,
JobRunState: pointer.Int32(lookout.JobRunPreemptedOrdinal),
Finished: &ts,
Error: tryCompressError(event.PreemptedJobId, "preempted", c.compressor),
Error: tryCompressError(event.PreemptedJobId, event.Reason, c.compressor),
}
update.JobRunsToUpdate = append(update.JobRunsToUpdate, &jobRun)
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ var expectedPreemptedRun = model.UpdateJobRunInstruction{
RunId: testfixtures.RunId,
Finished: &testfixtures.BaseTime,
JobRunState: pointer.Int32(lookout.JobRunPreemptedOrdinal),
Error: []byte("preempted"),
Error: []byte(testfixtures.PreemptionReason),
}

var expectedCancelledRun = model.UpdateJobRunInstruction{
Expand Down
36 changes: 14 additions & 22 deletions internal/scheduler/floatingresources/floating_resource_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,21 @@ import (

"github.com/armadaproject/armada/internal/common/maps"
"github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)

type FloatingResourceTypes struct {
zeroFloatingResources schedulerobjects.ResourceList
pools map[string]*floatingResourcePool
rlFactory *internaltypes.ResourceListFactory
}

type floatingResourcePool struct {
totalResources schedulerobjects.ResourceList
}

func NewFloatingResourceTypes(config []configuration.FloatingResourceConfig) (*FloatingResourceTypes, error) {
func NewFloatingResourceTypes(config []configuration.FloatingResourceConfig, rlFactory *internaltypes.ResourceListFactory) (*FloatingResourceTypes, error) {
zeroFloatingResources := schedulerobjects.ResourceList{Resources: make(map[string]resource.Quantity, len(config))}
for _, c := range config {
if _, exists := zeroFloatingResources.Resources[c.Name]; exists {
Expand Down Expand Up @@ -51,24 +53,21 @@ func NewFloatingResourceTypes(config []configuration.FloatingResourceConfig) (*F
return &FloatingResourceTypes{
zeroFloatingResources: zeroFloatingResources,
pools: pools,
rlFactory: rlFactory,
}, nil
}

func (frt *FloatingResourceTypes) WithinLimits(poolName string, allocated schedulerobjects.ResourceList) (bool, string) {
pool, exists := frt.pools[poolName]
if !exists {
func (frt *FloatingResourceTypes) WithinLimits(poolName string, allocated internaltypes.ResourceList) (bool, string) {
available := frt.GetTotalAvailableForPoolInternalTypes(poolName)
if available.AllZero() {
return false, fmt.Sprintf("floating resources not connfigured for pool %s", poolName)
}
rl := pool.totalResources.DeepCopy()
rl.Sub(allocated)
for resourceName, quantity := range rl.Resources {
if !frt.isFloatingResource(resourceName) {
continue
}
if quantity.Cmp(resource.Quantity{}) == -1 {
return false, fmt.Sprintf("not enough floating resource %s in pool %s", resourceName, poolName)
}

resourceName, _, _, exceeds := allocated.OfType(internaltypes.Floating).ExceedsAvailable(available)
if exceeds {
return false, fmt.Sprintf("not enough floating resource %s in pool %s", resourceName, poolName)
}

return true, ""
}

Expand All @@ -86,10 +85,8 @@ func (frt *FloatingResourceTypes) GetTotalAvailableForPool(poolName string) sche
return pool.totalResources.DeepCopy()
}

func (frt *FloatingResourceTypes) AddTotalAvailableForPool(poolName string, kubernetesResources schedulerobjects.ResourceList) schedulerobjects.ResourceList {
floatingResources := frt.GetTotalAvailableForPool(poolName) // Note GetTotalAvailableForPool returns a deep copy
floatingResources.Add(kubernetesResources)
return floatingResources
func (frt *FloatingResourceTypes) GetTotalAvailableForPoolInternalTypes(poolName string) internaltypes.ResourceList {
return frt.rlFactory.FromNodeProto(frt.GetTotalAvailableForPool(poolName).Resources)
}

func (frt *FloatingResourceTypes) SummaryString() string {
Expand All @@ -98,8 +95,3 @@ func (frt *FloatingResourceTypes) SummaryString() string {
}
return strings.Join(maps.Keys(frt.zeroFloatingResources.Resources), " ")
}

func (frt *FloatingResourceTypes) isFloatingResource(resourceName string) bool {
_, exists := frt.zeroFloatingResources.Resources[resourceName]
return exists
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,82 +7,93 @@ import (
"k8s.io/apimachinery/pkg/api/resource"

"github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
)

func TestAllPools(t *testing.T) {
sut := makeSut(t)
sut := makeSut(t, makeRlFactory())
assert.Equal(t, []string{"cpu", "gpu"}, sut.AllPools())
}

func TestGetTotalAvailableForPool(t *testing.T) {
sut := makeSut(t)
sut := makeSut(t, makeRlFactory())
zero := resource.Quantity{}
assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": resource.MustParse("200"), "floating-resource-2": resource.MustParse("300")}, sut.GetTotalAvailableForPool("cpu").Resources)
assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": resource.MustParse("100"), "floating-resource-2": zero}, sut.GetTotalAvailableForPool("gpu").Resources)
assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": zero, "floating-resource-2": zero}, sut.GetTotalAvailableForPool("some-other-pool").Resources)
}

func TestAddTotalAvailableForPool(t *testing.T) {
sut := makeSut(t)
zero := resource.Quantity{}
ten := *resource.NewQuantity(10, resource.DecimalSI)
kubernetesResources := schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": ten}}
assert.Equal(t, map[string]resource.Quantity{"cpu": ten, "floating-resource-1": resource.MustParse("200"), "floating-resource-2": resource.MustParse("300")}, sut.AddTotalAvailableForPool("cpu", kubernetesResources).Resources)
assert.Equal(t, map[string]resource.Quantity{"cpu": ten, "floating-resource-1": resource.MustParse("100"), "floating-resource-2": zero}, sut.AddTotalAvailableForPool("gpu", kubernetesResources).Resources)
assert.Equal(t, map[string]resource.Quantity{"cpu": ten, "floating-resource-1": zero, "floating-resource-2": zero}, sut.AddTotalAvailableForPool("some-other-pool", kubernetesResources).Resources)
assert.Equal(t, map[string]resource.Quantity{"cpu": ten}, kubernetesResources.Resources) // check hasn't mutated arg
func TestGetTotalAvailableForPoolInternalTypes(t *testing.T) {
sut := makeSut(t, makeRlFactory())

cpuPool := sut.GetTotalAvailableForPoolInternalTypes("cpu")
assert.Equal(t, int64(200000), cpuPool.GetByNameZeroIfMissing("floating-resource-1"))
assert.Equal(t, int64(300000), cpuPool.GetByNameZeroIfMissing("floating-resource-2"))

gpuPool := sut.GetTotalAvailableForPoolInternalTypes("gpu")
assert.Equal(t, int64(100000), gpuPool.GetByNameZeroIfMissing("floating-resource-1"))
assert.Equal(t, int64(0), gpuPool.GetByNameZeroIfMissing("floating-resource-2"))

notFound := sut.GetTotalAvailableForPoolInternalTypes("some-invalid-value")
assert.Equal(t, int64(0), notFound.GetByNameZeroIfMissing("floating-resource-1"))
assert.Equal(t, int64(0), notFound.GetByNameZeroIfMissing("floating-resource-2"))
}

func TestWithinLimits_WhenWithinLimits_ReturnsTrue(t *testing.T) {
sut := makeSut(t)
rlFactory := makeRlFactory()
sut := makeSut(t, rlFactory)
withinLimits, errorMessage := sut.WithinLimits("cpu",
schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"floating-resource-1": resource.MustParse("199")}},
rlFactory.FromJobResourceListIgnoreUnknown(map[string]resource.Quantity{"floating-resource-1": resource.MustParse("199")}),
)
assert.True(t, withinLimits)
assert.Empty(t, errorMessage)
}

func TestWithinLimits_WhenAtLimit_ReturnsTrue(t *testing.T) {
sut := makeSut(t)
rlFactory := makeRlFactory()
sut := makeSut(t, rlFactory)
withinLimits, errorMessage := sut.WithinLimits("cpu",
schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"floating-resource-1": resource.MustParse("200")}},
rlFactory.FromJobResourceListIgnoreUnknown(map[string]resource.Quantity{"floating-resource-1": resource.MustParse("200")}),
)
assert.True(t, withinLimits)
assert.Empty(t, errorMessage)
}

func TestWithinLimits_WhenExceedsLimit_ReturnsFalse(t *testing.T) {
sut := makeSut(t)
rlFactory := makeRlFactory()
sut := makeSut(t, rlFactory)
withinLimits, errorMessage := sut.WithinLimits("cpu",
schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"floating-resource-1": resource.MustParse("201")}},
rlFactory.FromJobResourceListIgnoreUnknown(map[string]resource.Quantity{"floating-resource-1": resource.MustParse("201")}),
)
assert.False(t, withinLimits)
assert.NotEmpty(t, errorMessage)
}

func TestWithinLimits_IgnoresNonFloatingResources(t *testing.T) {
sut := makeSut(t)
rlFactory := makeRlFactory()
sut := makeSut(t, rlFactory)
withinLimits, errorMessage := sut.WithinLimits("cpu",
schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"some-other-resource": resource.MustParse("1000")}},
rlFactory.FromJobResourceListIgnoreUnknown(map[string]resource.Quantity{"cpu": resource.MustParse("1000")}),
)
assert.True(t, withinLimits)
assert.Empty(t, errorMessage)
}

func TestWithinLimits_WhenResourceNotSpecifiedForAPool_ReturnsFalse(t *testing.T) {
sut := makeSut(t)
rlFactory := makeRlFactory()
sut := makeSut(t, rlFactory)
withinLimits, errorMessage := sut.WithinLimits("gpu",
schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"floating-resource-2": resource.MustParse("1")}},
rlFactory.FromJobResourceListIgnoreUnknown(map[string]resource.Quantity{"floating-resource-2": resource.MustParse("1")}),
)
assert.False(t, withinLimits)
assert.NotEmpty(t, errorMessage)
}

func TestWithinLimits_WhenPoolDoesNotExist_ReturnsFalse(t *testing.T) {
sut := makeSut(t)
rlFactory := makeRlFactory()
sut := makeSut(t, rlFactory)
withinLimits, errorMessage := sut.WithinLimits("some-other-pool",
schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"floating-resource-1": resource.MustParse("1")}},
rlFactory.FromJobResourceListIgnoreUnknown(map[string]resource.Quantity{"floating-resource-1": resource.MustParse("1")}),
)
assert.False(t, withinLimits)
assert.NotEmpty(t, errorMessage)
Expand Down Expand Up @@ -115,8 +126,18 @@ func testConfig() []configuration.FloatingResourceConfig {
}
}

func makeSut(t *testing.T) *FloatingResourceTypes {
sut, err := NewFloatingResourceTypes(testConfig())
func makeRlFactory() *internaltypes.ResourceListFactory {
rlFactory, err := internaltypes.NewResourceListFactory([]configuration.ResourceType{
{Name: "cpu"},
}, testConfig())
if err != nil {
panic(err)
}
return rlFactory
}

func makeSut(t *testing.T, rlFactory *internaltypes.ResourceListFactory) *FloatingResourceTypes {
sut, err := NewFloatingResourceTypes(testConfig(), rlFactory)
assert.Nil(t, err)
return sut
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ func TestMax(t *testing.T) {
factory := testFactory()
assert.Equal(t, 0.0, testResourceFractionList(factory, -0.1, 0.0, 0.0).Max())
assert.Equal(t, 0.0, testResourceFractionList(factory, 0.0, 0.0, 0.0).Max())
assert.Equal(t, 0.9, testResourceFractionList(factory, 0.1, 0.9, 0.7).Max())
assert.Equal(t, 0.9, testResourceFractionList(factory, 0.2, 0.9, 0.1).Max())
assert.Equal(t, 0.9, testResourceFractionList(factory, 0.9, 0.2, 0.1).Max())
}

func TestMax_HandlesEmptyCorrectly(t *testing.T) {
Expand Down
Loading

0 comments on commit 7b9ff1d

Please sign in to comment.