Skip to content

Commit

Permalink
POC of Market Based Pricing (#4070)
Browse files Browse the repository at this point in the history
* F/chrisma/internal price poc (#293)

* proto

* price to scheduler

* wip

* Update README.md

* wip

* wip

* wip

---------

Co-authored-by: Chris Martin <[email protected]>
Co-authored-by: Chris Martin <[email protected]>

* wip

* wip

* wip

* lint

* lint

* lint

* lint

* lint

* lint

* add spot price

* lint

* gang preemption test

* lint

* revert files

* fix kind

* fixes following testing

* revert infra change

* revert infra change

* fix test

* revert run config

* typos

---------

Co-authored-by: Christopher Martin <[email protected]>
  • Loading branch information
d80tb7 and svc-oeg-aws2github authored Dec 2, 2024
1 parent 500e08a commit d5612f5
Show file tree
Hide file tree
Showing 41 changed files with 2,466 additions and 661 deletions.
4 changes: 2 additions & 2 deletions .run/Executor.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<env name="ARMADA_EXECUTORAPICONNECTION_FORCENOTLS" value="true" />
<env name="ARMADA_HTTPPORT" value="8085" />
<env name="HOME" value="$USER_HOME$/" />
<env name="KUBECONFIG" value="$PROJECT_DIR$/.kube/internal/config" />
<env name="KUBECONFIG" value="$PROJECT_DIR$/.kube/external/config" />
</envs>
<pass_parent_env value="false" />
<kind value="FILE" />
Expand All @@ -17,4 +17,4 @@
<filePath value="$PROJECT_DIR$/cmd/executor/main.go" />
<method v="2" />
</configuration>
</component>
</component>
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,4 @@ For more information about contributing to Armada see [CONTRIBUTING.md](https://
## Discussion

If you are interested in discussing Armada you can find us on [![slack](https://img.shields.io/badge/slack-armada-brightgreen.svg?logo=slack)](https://cloud-native.slack.com/?redir=%2Farchives%2FC03T9CBCEMC)

3 changes: 3 additions & 0 deletions e2e/setup/kind.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ apiVersion: kind.x-k8s.io/v1alpha4
name: armada-test
featureGates:
"KubeletInUserNamespace": true
networking:
apiServerAddress: 0.0.0.0
nodes:
- role: worker
image: kindest/node:v1.26.15
Expand All @@ -28,3 +30,4 @@ nodes:
- containerPort: 6443 # control plane
hostPort: 6443 # exposes control plane on localhost:6443
protocol: TCP

15 changes: 11 additions & 4 deletions internal/common/eventutil/eventutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,13 @@ func ApiJobFromLogSubmitJob(ownerId string, groups []string, queueName string, j
podSpecs = k8sPodSpecs
}

var priceInfo *api.ExperimentalPriceInfo
if e.ExperimentalPriceInfo != nil {
priceInfo = &api.ExperimentalPriceInfo{
BidPrice: e.ExperimentalPriceInfo.BidPrice,
}
}

return &api.Job{
Id: e.JobId,
ClientId: e.DeduplicationId,
Expand All @@ -170,10 +177,10 @@ func ApiJobFromLogSubmitJob(ownerId string, groups []string, queueName string, j
Labels: e.ObjectMeta.Labels,
Annotations: e.ObjectMeta.Annotations,

K8SIngress: k8sIngresses,
K8SService: k8sServices,

Priority: float64(e.Priority),
K8SIngress: k8sIngresses,
K8SService: k8sServices,
ExperimentalPriceInfo: priceInfo,
Priority: float64(e.Priority),

PodSpec: podSpec,
PodSpecs: podSpecs,
Expand Down
1 change: 1 addition & 0 deletions internal/scheduler/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ type PoolConfig struct {
Name string `validate:"required"`
AwayPools []string
ProtectedFractionOfFairShare *float64
MarketDriven bool
}

func (sc *SchedulingConfig) GetProtectedFractionOfFairShare(poolName string) float64 {
Expand Down
2 changes: 2 additions & 0 deletions internal/scheduler/database/job_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (r *PostgresJobRepository) FetchInitialJobs(ctx *armadacontext.Context) ([]
JobSet: row.JobSet,
Queue: row.Queue,
Priority: row.Priority,
BidPrice: row.BidPrice,
Submitted: row.Submitted,
Validated: row.Validated,
Queued: row.Queued,
Expand Down Expand Up @@ -228,6 +229,7 @@ func (r *PostgresJobRepository) FetchJobUpdates(ctx *armadacontext.Context, jobS
JobSet: row.JobSet,
Queue: row.Queue,
Priority: row.Priority,
BidPrice: row.BidPrice,
Submitted: row.Submitted,
Validated: row.Validated,
Queued: row.Queued,
Expand Down
1 change: 1 addition & 0 deletions internal/scheduler/database/migrations/018_add_price.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE jobs ADD COLUMN bid_price double precision NOT NULL DEFAULT 0;
1 change: 1 addition & 0 deletions internal/scheduler/database/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 18 additions & 8 deletions internal/scheduler/database/query.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions internal/scheduler/database/query/query.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ SELECT * FROM jobs WHERE serial > $1 ORDER BY serial LIMIT $2;
SELECT job_id FROM jobs;

-- name: SelectInitialJobs :many
SELECT job_id, job_set, queue, priority, submitted, queued, queued_version, validated, cancel_requested, cancel_by_jobset_requested, cancelled, succeeded, failed, scheduling_info, scheduling_info_version, pools, serial FROM jobs WHERE serial > $1 AND cancelled = 'false' AND succeeded = 'false' and failed = 'false' ORDER BY serial LIMIT $2;
SELECT job_id, job_set, queue, priority, bid_price, submitted, queued, queued_version, validated, cancel_requested, cancel_by_jobset_requested, cancelled, succeeded, failed, scheduling_info, scheduling_info_version, pools, serial FROM jobs WHERE serial > $1 AND cancelled = 'false' AND succeeded = 'false' and failed = 'false' ORDER BY serial LIMIT $2;

-- name: SelectUpdatedJobs :many
SELECT job_id, job_set, queue, priority, submitted, queued, queued_version, validated, cancel_requested, cancel_by_jobset_requested, cancelled, succeeded, failed, scheduling_info, scheduling_info_version, pools, serial FROM jobs WHERE serial > $1 ORDER BY serial LIMIT $2;
SELECT job_id, job_set, queue, priority, bid_price, submitted, queued, queued_version, validated, cancel_requested, cancel_by_jobset_requested, cancelled, succeeded, failed, scheduling_info, scheduling_info_version, pools, serial FROM jobs WHERE serial > $1 ORDER BY serial LIMIT $2;

-- name: UpdateJobPriorityByJobSet :exec
UPDATE jobs SET priority = $1 WHERE job_set = $2 and queue = $3 and cancelled = false and succeeded = false and failed = false;
Expand Down
68 changes: 66 additions & 2 deletions internal/scheduler/jobdb/comparison.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package jobdb

type (
JobPriorityComparer struct{}
JobIdHasher struct{}
JobPriorityComparer struct{}
MarketJobPriorityComparer struct{}
JobIdHasher struct{}
)

func (JobIdHasher) Hash(j *Job) uint32 {
Expand All @@ -21,6 +22,10 @@ func (JobPriorityComparer) Compare(job, other *Job) int {
return SchedulingOrderCompare(job, other)
}

func (MarketJobPriorityComparer) Compare(job, other *Job) int {
return MarketSchedulingOrderCompare(job, other)
}

// SchedulingOrderCompare defines the order in which jobs in a particular queue should be scheduled,
func (job *Job) SchedulingOrderCompare(other *Job) int {
// We need this cast for now to expose this method via an interface.
Expand Down Expand Up @@ -93,3 +98,62 @@ func SchedulingOrderCompare(job, other *Job) int {
}
panic("We should never get here. Since we check for job id equality at the top of this function.")
}

func MarketSchedulingOrderCompare(job, other *Job) int {
// Jobs with equal id are always considered equal.
// This ensures at most one job with a particular id can exist in the jobDb.
if job.id == other.id {
return 0
}

// Next we sort on bidPrice
if job.bidPrice > other.bidPrice {
return -1
} else if job.bidPrice < other.bidPrice {
return 1
}

// PriorityClassPriority indicates urgency.
// Hence, jobs of higher priorityClassPriority come first.
if job.priorityClass.Priority > other.priorityClass.Priority {
return -1
} else if job.priorityClass.Priority < other.priorityClass.Priority {
return 1
}

// Jobs higher in queue-priority come first.
if job.priority < other.priority {
return -1
} else if job.priority > other.priority {
return 1
}

// If both jobs are active, order by time since the job was scheduled.
// This ensures jobs that have been running for longer are rescheduled first,
// which reduces wasted compute time when preempting.
jobIsActive := job.activeRun != nil && !job.activeRun.InTerminalState()
otherIsActive := other.activeRun != nil && !other.activeRun.InTerminalState()
if jobIsActive && otherIsActive {
if job.activeRunTimestamp < other.activeRunTimestamp {
return -1
} else if job.activeRunTimestamp > other.activeRunTimestamp {
return 1
}
}

// Jobs that have been queuing for longer are scheduled first.
if job.submittedTime < other.submittedTime {
return -1
} else if job.submittedTime > other.submittedTime {
return 1
}

// Tie-break by jobId, which must be unique.
// This ensures there is a total order between jobs, i.e., no jobs are equal from an ordering point of view.
if job.id < other.id {
return -1
} else if job.id > other.id {
return 1
}
panic("We should never get here. Since we check for job id equality at the top of this function.")
}
17 changes: 17 additions & 0 deletions internal/scheduler/jobdb/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type Job struct {
jobSet string
// Per-queue priority of this job.
priority uint32
// BidPrice the user is willing to pay to have this job scheduled
bidPrice float64
// Requested per queue priority of this job.
// This is used when syncing the postgres database with the scheduler-internal database.
requestedPriority uint32
Expand Down Expand Up @@ -300,6 +302,9 @@ func (job *Job) Equal(other *Job) bool {
if job.priority != other.priority {
return false
}
if job.bidPrice != other.bidPrice {
return false
}
if job.requestedPriority != other.requestedPriority {
return false
}
Expand Down Expand Up @@ -378,6 +383,11 @@ func (job *Job) Priority() uint32 {
return job.priority
}

// BidPrice returns the bidPrice of the job.
func (job *Job) BidPrice() float64 {
return job.bidPrice
}

// PriorityClass returns the priority class of the job.
func (job *Job) PriorityClass() types.PriorityClass {
return job.priorityClass
Expand Down Expand Up @@ -413,6 +423,13 @@ func (job *Job) WithPriority(priority uint32) *Job {
return j
}

// WithBidPrice returns a copy of the job with the bidPrice updated.
func (job *Job) WithBidPrice(price float64) *Job {
j := copyJob(*job)
j.bidPrice = price
return j
}

// WithPools returns a copy of the job with the pools updated.
func (job *Job) WithPools(pools []string) *Job {
j := copyJob(*job)
Expand Down
Loading

0 comments on commit d5612f5

Please sign in to comment.