Skip to content

Commit

Permalink
Move creation of IMEX channel pool to after the deployment is fully up.
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Klues <[email protected]>
  • Loading branch information
klueska committed Jan 24, 2025
1 parent 29ba85b commit d17ee67
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 37 deletions.
43 changes: 42 additions & 1 deletion cmd/nvidia-dra-imex-controller/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,15 @@ func (m *DeploymentManager) Delete(ctx context.Context, cdUID string) error {
}

key := d.Spec.Selector.MatchLabels[computeDomainLabelKey]

if err := m.removePodManager(key); err != nil {
return fmt.Errorf("error removing Pod manager: %w", err)
}

if err := m.imexChannelManager.DeletePool(key); err != nil {
return fmt.Errorf("error deleting IMEX channel pool: %w", err)
}

return nil
}

Expand Down Expand Up @@ -301,6 +306,42 @@ func (m *DeploymentManager) onAddOrUpdate(ctx context.Context, obj any) error {
return fmt.Errorf("error adding Pod manager '%s/%s': %w", d.Namespace, d.Name, err)
}

if d.Status.AvailableReplicas != *d.Spec.Replicas {
return nil
}

if err := m.createOrUpdatePool(d, cd); err != nil {
return fmt.Errorf("error creating or updating pool: %w", err)
}

return nil
}

func (m *DeploymentManager) createOrUpdatePool(d *appsv1.Deployment, cd *nvapi.ComputeDomain) error {
var nodeNames []string
for _, node := range cd.Status.Nodes {
nodeNames = append(nodeNames, node.Name)
}

nodeSelector := corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: "kubernetes.io/hostname",
Operator: corev1.NodeSelectorOpIn,
Values: nodeNames,
},
},
},
},
}

computeDomainLabel := d.Spec.Selector.MatchLabels[computeDomainLabelKey]
if err := m.imexChannelManager.CreateOrUpdatePool(computeDomainLabel, &nodeSelector); err != nil {
return fmt.Errorf("failed to create or update IMEX channel pool: %w", err)
}

return nil
}

Expand All @@ -311,7 +352,7 @@ func (m *DeploymentManager) addPodManager(ctx context.Context, labelSelector *me
return nil
}

podManager := NewDeploymentPodManager(m.config, m.imexChannelManager, labelSelector, numPods, m.getComputeDomain)
podManager := NewDeploymentPodManager(m.config, labelSelector, numPods, m.getComputeDomain)

if err := podManager.Start(ctx); err != nil {
return fmt.Errorf("error creating Pod manager: %w", err)
Expand Down
43 changes: 7 additions & 36 deletions cmd/nvidia-dra-imex-controller/deploymentpods.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,10 @@ type DeploymentPodManager struct {

getComputeDomain GetComputeDomainFunc
computeDomainNodes []*nvapi.ComputeDomainNode
computeDomainLabel string
numPods int
nodeSelector corev1.NodeSelector

imexChannelManager *ImexChannelManager
}

func NewDeploymentPodManager(config *ManagerConfig, imexChannelManager *ImexChannelManager, labelSelector *metav1.LabelSelector, numPods int, getComputeDomain GetComputeDomainFunc) *DeploymentPodManager {
func NewDeploymentPodManager(config *ManagerConfig, labelSelector *metav1.LabelSelector, numPods int, getComputeDomain GetComputeDomainFunc) *DeploymentPodManager {
factory := informers.NewSharedInformerFactoryWithOptions(
config.clientsets.Core,
informerResyncPeriod,
Expand All @@ -68,30 +64,13 @@ func NewDeploymentPodManager(config *ManagerConfig, imexChannelManager *ImexChan
informer := factory.Core().V1().Pods().Informer()
lister := factory.Core().V1().Pods().Lister()

nodeSelector := corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: "kubernetes.io/hostname",
Operator: corev1.NodeSelectorOpIn,
Values: []string{},
},
},
},
},
}

m := &DeploymentPodManager{
config: config,
factory: factory,
informer: informer,
lister: lister,
getComputeDomain: getComputeDomain,
computeDomainLabel: labelSelector.MatchLabels[computeDomainLabelKey],
numPods: numPods,
nodeSelector: nodeSelector,
imexChannelManager: imexChannelManager,
config: config,
factory: factory,
informer: informer,
lister: lister,
getComputeDomain: getComputeDomain,
numPods: numPods,
}

return m
Expand Down Expand Up @@ -135,9 +114,6 @@ func (m *DeploymentPodManager) Start(ctx context.Context) (rerr error) {
}

func (m *DeploymentPodManager) Stop() error {
if err := m.imexChannelManager.DeletePool(m.computeDomainLabel); err != nil {
return fmt.Errorf("error deleting IMEX channel pool: %w", err)
}
m.cancelContext()
m.waitGroup.Wait()
return nil
Expand Down Expand Up @@ -194,11 +170,6 @@ func (m *DeploymentPodManager) onPodAddOrUpdate(ctx context.Context, obj any) er
return fmt.Errorf("error updating nodes in ComputeDomain status: %w", err)
}

m.nodeSelector.NodeSelectorTerms[0].MatchExpressions[0].Values = nodeNames
if err := m.imexChannelManager.CreateOrUpdatePool(m.computeDomainLabel, &m.nodeSelector); err != nil {
return fmt.Errorf("failed to create or update IMEX channel pool: %w", err)
}

return nil
}

Expand Down

0 comments on commit d17ee67

Please sign in to comment.