Skip to content

Commit

Permalink
[YUNIKORN-2907] fix based on reviews
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael committed Dec 31, 2024
1 parent 64eed25 commit 3f85580
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 17 deletions.
2 changes: 1 addition & 1 deletion pkg/scheduler/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func (cc *ClusterContext) updateSchedulerConfig(conf *configs.SchedulerConfig, r
part, ok := cc.partitions[p.Name]
if ok {
// make sure the new info passes all checks
_, err = newPartitionContext(p, rmID, nil)
_, err = newPartitionContextForValidation(p, rmID, nil)
if err != nil {
return err
}
Expand Down
16 changes: 6 additions & 10 deletions pkg/scheduler/objects/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,21 +118,19 @@ func newBlankQueue() *Queue {
func NewConfiguredQueue(conf configs.QueueConfig, parent *Queue) (*Queue, error) {
queue, err := newConfiguredQueueInternal(conf, parent)
if queue != nil {
queue.queueEvents = schedEvt.NewQueueEvents(events.GetEventSystem())
log.Log(log.SchedQueue).Info("configured queue added to scheduler",
zap.String("queueName", queue.QueuePath))
queue.queueEvents.SendNewQueueEvent(queue.QueuePath, queue.isManaged)
}
return queue, err
}

// NewConfiguredShadowQueue creates a new queue from scratch based on the configuration and logs at debug level
// NewConfiguredQueueForValidation is used to validate the queue configuration.
// It works similarly to NewConfiguredQueue but neither logs the queue creation nor sends a queue event.
// lock free as it cannot be referenced yet
func NewConfiguredShadowQueue(conf configs.QueueConfig, parent *Queue) (*Queue, error) {
queue, err := newConfiguredQueueInternal(conf, parent)
if queue != nil {
log.Log(log.SchedQueue).Debug("shadow queue created",
zap.String("queueName", queue.QueuePath))
}
return queue, err
func NewConfiguredQueueForValidation(conf configs.QueueConfig, parent *Queue) (*Queue, error) {
return newConfiguredQueueInternal(conf, parent)
}

func newConfiguredQueueInternal(conf configs.QueueConfig, parent *Queue) (*Queue, error) {
Expand Down Expand Up @@ -164,8 +162,6 @@ func newConfiguredQueueInternal(conf configs.QueueConfig, parent *Queue) (*Queue
} else {
sq.UpdateQueueProperties()
}
sq.queueEvents = schedEvt.NewQueueEvents(events.GetEventSystem())
sq.queueEvents.SendNewQueueEvent(sq.QueuePath, sq.isManaged)

return sq, nil
}
Expand Down
83 changes: 77 additions & 6 deletions pkg/scheduler/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,29 @@ type PartitionContext struct {
locking.RWMutex
}

// newPartitionContextForValidation initializes a shadow partition based on the configuration.
// The shadow partition is used to validate the configuration, it is not used for scheduling.
func newPartitionContextForValidation(conf configs.PartitionConfig, rmID string, cc *ClusterContext) (*PartitionContext, error) {
pc, err := newPartitionContextInternal(conf, rmID, cc)
if pc != nil {
if err := pc.initialPartitionFromConfigForValidation(conf); err != nil {
return nil, err
}
}
return pc, err
}

func newPartitionContext(conf configs.PartitionConfig, rmID string, cc *ClusterContext) (*PartitionContext, error) {
pc, err := newPartitionContextInternal(conf, rmID, cc)
if pc != nil {
if err := pc.initialPartitionFromConfig(conf); err != nil {
return nil, err
}
}
return pc, err
}

func newPartitionContextInternal(conf configs.PartitionConfig, rmID string, cc *ClusterContext) (*PartitionContext, error) {
if conf.Name == "" || rmID == "" {
log.Log(log.SchedPartition).Info("partition cannot be created",
zap.String("partition name", conf.Name),
Expand All @@ -98,13 +120,41 @@ func newPartitionContext(conf configs.PartitionConfig, rmID string, cc *ClusterC
foreignAllocs: make(map[string]*objects.Allocation),
}
pc.partitionManager = newPartitionManager(pc, cc)
if err := pc.initialPartitionFromConfig(conf); err != nil {
return nil, err
}

return pc, nil
}

// initialPartitionFromConfigForValidation is used to validate the partition configuration.
// It works similarly to initialPartitionFromConfig but neither logs the queue creation, sends a queue event, logs the node sorting policy,
// nor updates user settings.
func (pc *PartitionContext) initialPartitionFromConfigForValidation(conf configs.PartitionConfig) error {
if len(conf.Queues) == 0 || conf.Queues[0].Name != configs.RootQueue {
return fmt.Errorf("partition cannot be created without root queue")
}

// Setup the queue structure: root first it should be the only queue at this level
// Add the rest of the queue structure recursively
queueConf := conf.Queues[0]
var err error
if pc.root, err = objects.NewConfiguredQueueForValidation(queueConf, nil); err != nil {
return err
}
// recursively add the queues to the root
if err = pc.addQueueForValidation(queueConf.Queues, pc.root); err != nil {
return err
}

// We need to pass in the locked version of the GetQueue function.
// Placing an application will not have a lock on the partition context.
pc.placementManager = placement.NewPlacementManager(conf.PlacementRules, pc.GetQueue)
// get the user group cache for the partition
pc.userGroupCache = security.GetUserGroupCache("")
pc.updateNodeSortingPolicyForValidation(conf)
pc.updatePreemption(conf)

return nil
}

// Initialise the partition
func (pc *PartitionContext) initialPartitionFromConfig(conf configs.PartitionConfig) error {
if len(conf.Queues) == 0 || conf.Queues[0].Name != configs.RootQueue {
Expand Down Expand Up @@ -138,6 +188,13 @@ func (pc *PartitionContext) initialPartitionFromConfig(conf configs.PartitionCon
return ugm.GetUserManager().UpdateConfig(queueConf, conf.Queues[0].Name)
}

// updateNodeSortingPolicyForValidation is used to validate the partition configuration.
// It works similarly to updateNodeSortingPolicy but without logging.
// NOTE: this is a lock free call. It should only be called holding the PartitionContext lock.
func (pc *PartitionContext) updateNodeSortingPolicyForValidation(conf configs.PartitionConfig) {
pc.updateNodeSortingPolicyInternal(conf)
}

// NOTE: this is a lock free call. It should only be called holding the PartitionContext lock.
func (pc *PartitionContext) updateNodeSortingPolicy(conf configs.PartitionConfig) {
var configuredPolicy policies.SortingPolicy
Expand All @@ -150,6 +207,10 @@ func (pc *PartitionContext) updateNodeSortingPolicy(conf configs.PartitionConfig
log.Log(log.SchedPartition).Info("NodeSorting policy set from config",
zap.Stringer("policyName", configuredPolicy))
}
pc.updateNodeSortingPolicyInternal(conf)
}

func (pc *PartitionContext) updateNodeSortingPolicyInternal(conf configs.PartitionConfig) {
pc.nodes.SetNodeSortingPolicy(objects.NewNodeSortingPolicy(conf.NodeSortPolicy.Type, conf.NodeSortPolicy.ResourceWeights))
}

Expand Down Expand Up @@ -191,17 +252,27 @@ func (pc *PartitionContext) updatePartitionDetails(conf configs.PartitionConfig)
return ugm.GetUserManager().UpdateConfig(queueConf, conf.Queues[0].Name)
}

func (pc *PartitionContext) addQueueForValidation(conf []configs.QueueConfig, parent *objects.Queue) error {
err := pc.addQueueInternal(conf, parent, objects.NewConfiguredQueueForValidation)
return err
}

// Process the config structure and create a queue info tree for this partition
func (pc *PartitionContext) addQueue(conf []configs.QueueConfig, parent *objects.Queue) error {
err := pc.addQueueInternal(conf, parent, objects.NewConfiguredQueue)
return err
}

func (pc *PartitionContext) addQueueInternal(conf []configs.QueueConfig, parent *objects.Queue, newQueueFn func(configs.QueueConfig, *objects.Queue) (*objects.Queue, error)) error {
// create the queue at this level
for _, queueConf := range conf {
thisQueue, err := objects.NewConfiguredQueue(queueConf, parent)
thisQueue, err := newQueueFn(queueConf, parent)
if err != nil {
return err
}
// recursive create the queues below
if len(queueConf.Queues) > 0 {
err = pc.addQueue(queueConf.Queues, thisQueue)
err = pc.addQueueInternal(queueConf.Queues, thisQueue, newQueueFn)
if err != nil {
return err
}
Expand All @@ -224,7 +295,7 @@ func (pc *PartitionContext) updateQueues(config []configs.QueueConfig, parent *o
queue := pc.getQueueInternal(pathName)
var err error
if queue == nil {
queue, err = objects.NewConfiguredShadowQueue(queueConfig, parent)
queue, err = objects.NewConfiguredQueue(queueConfig, parent)
} else {
err = queue.ApplyConf(queueConfig)
}
Expand Down

0 comments on commit 3f85580

Please sign in to comment.