@@ -29,24 +29,26 @@ type WorkloadAllocator interface {
29
29
// TimeoutFunc defines a function type that determines if a runner has timed out based on the last activity.
30
30
type TimeoutFunc func (runnerID string , lastActivityTime time.Time ) bool
31
31
32
- // allocator implements the WorkloadAllocator interface, managing runners, slots, and workload allocation.
33
- type allocator struct {
32
+ // Allocator implements the WorkloadAllocator interface, managing runners, slots, and workload allocation.
33
+ type Allocator struct {
34
34
slots * xsync.MapOf [uuid.UUID , * Slot ] // Maps slot ID to Slot details.
35
35
modelStaleFunc TimeoutFunc // Function to check if models are stale
36
36
slotTimeoutFunc TimeoutFunc // Function to check if slots have timed out due to error
37
37
}
38
38
39
+ var _ WorkloadAllocator = & Allocator {}
40
+
39
41
// NewWorkloadAllocator creates a new allocator instance with timeout functions for models and runners.
40
- func NewWorkloadAllocator (staleFunc TimeoutFunc , slotTimeoutFunc TimeoutFunc ) * allocator {
41
- return & allocator {
42
+ func NewWorkloadAllocator (staleFunc TimeoutFunc , slotTimeoutFunc TimeoutFunc ) * Allocator {
43
+ return & Allocator {
42
44
slots : xsync .NewMapOf [uuid.UUID , * Slot ](),
43
45
modelStaleFunc : staleFunc ,
44
46
slotTimeoutFunc : slotTimeoutFunc ,
45
47
}
46
48
}
47
49
48
50
// AllocateSlot assigns a workload to a specific slot, validating the model and slot before scheduling.
49
- func (a * allocator ) AllocateSlot (slotID uuid.UUID , req * Workload ) error {
51
+ func (a * Allocator ) AllocateSlot (slotID uuid.UUID , req * Workload ) error {
50
52
// Validate model
51
53
if _ , err := model .GetModel (req .ModelName ().String ()); err != nil {
52
54
return fmt .Errorf ("unable to get model (%s): %v" , req .ModelName (), err )
@@ -81,7 +83,7 @@ func (a *allocator) AllocateSlot(slotID uuid.UUID, req *Workload) error {
81
83
}
82
84
83
85
// AllocateNewSlot creates a new slot for a workload and allocates it to the best available runner.
84
- func (a * allocator ) AllocateNewSlot (runnerID string , req * Workload ) (* Slot , error ) {
86
+ func (a * Allocator ) AllocateNewSlot (runnerID string , req * Workload ) (* Slot , error ) {
85
87
// Create a new slot and schedule the workload.
86
88
slot := NewSlot (runnerID , req , a .modelStaleFunc , a .slotTimeoutFunc )
87
89
log .Trace ().
@@ -100,7 +102,7 @@ func (a *allocator) AllocateNewSlot(runnerID string, req *Workload) (*Slot, erro
100
102
}
101
103
102
104
// ReleaseSlot frees the resources allocated to a specific slot.
103
- func (a * allocator ) ReleaseSlot (slotID uuid.UUID ) error {
105
+ func (a * Allocator ) ReleaseSlot (slotID uuid.UUID ) error {
104
106
// Find the slot.
105
107
slot , ok := a .slots .Load (slotID )
106
108
if ! ok {
@@ -121,7 +123,7 @@ func (a *allocator) ReleaseSlot(slotID uuid.UUID) error {
121
123
}
122
124
123
125
// ReconcileSlots updates the state of a runner and reconciles its slots with the allocator's records.
124
- func (a * allocator ) ReconcileSlots (props * types.RunnerState ) error {
126
+ func (a * Allocator ) ReconcileSlots (props * types.RunnerState ) error {
125
127
// Log runner state update.
126
128
l := log .With ().
127
129
Str ("runner_id" , props .ID ).
@@ -216,7 +218,7 @@ func (a *allocator) ReconcileSlots(props *types.RunnerState) error {
216
218
}
217
219
218
220
// WarmSlots returns a list of available slots with warm models waiting for work.
219
- func (a * allocator ) WarmSlots (req * Workload ) []* Slot {
221
+ func (a * Allocator ) WarmSlots (req * Workload ) []* Slot {
220
222
cosyWarm := make ([]* Slot , 0 , a .slots .Size ())
221
223
222
224
a .slots .Range (func (id uuid.UUID , slot * Slot ) bool {
@@ -268,7 +270,7 @@ func (a *allocator) WarmSlots(req *Workload) []*Slot {
268
270
}
269
271
270
272
// RunnerSlots returns all slots associated with a specific runner ID.
271
- func (a * allocator ) RunnerSlots (id string ) []* Slot {
273
+ func (a * Allocator ) RunnerSlots (id string ) []* Slot {
272
274
allSlots := Values (a .slots )
273
275
// Filter slots to include only those belonging to the specified runner.
274
276
return Filter (allSlots , func (s * Slot ) bool {
@@ -278,7 +280,7 @@ func (a *allocator) RunnerSlots(id string) []*Slot {
278
280
279
281
// DeadSlots checks for any runners that have timed out and removes them.
280
282
// It returns the slots associated with the dead runners.
281
- func (a * allocator ) DeadSlots (deadRunnerIDs []string ) []* Slot {
283
+ func (a * Allocator ) DeadSlots (deadRunnerIDs []string ) []* Slot {
282
284
deadSlots := make ([]* Slot , 0 )
283
285
// Iterate through runners to check if any have timed out.
284
286
for _ , runnerID := range deadRunnerIDs {
@@ -303,7 +305,7 @@ func (a *allocator) DeadSlots(deadRunnerIDs []string) []*Slot {
303
305
}
304
306
305
307
// StartSlot marks scheduled work as in progress
306
- func (a * allocator ) StartSlot (slotID uuid.UUID ) error {
308
+ func (a * Allocator ) StartSlot (slotID uuid.UUID ) error {
307
309
// Find the slot.
308
310
slot , ok := a .slots .Load (slotID )
309
311
if ! ok {
@@ -326,6 +328,6 @@ func (a *allocator) StartSlot(slotID uuid.UUID) error {
326
328
return nil
327
329
}
328
330
329
- func (a * allocator ) DeleteSlot (slotID uuid.UUID ) {
331
+ func (a * Allocator ) DeleteSlot (slotID uuid.UUID ) {
330
332
a .slots .Delete (slotID )
331
333
}
0 commit comments