Skip to content

Commit

Permalink
Update ImexdaemonSettingsManager to pull IPs from ComputeDomain status
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Klues <[email protected]>
  • Loading branch information
klueska committed Jan 24, 2025
1 parent 07a17a1 commit 6d6e037
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 5 deletions.
13 changes: 12 additions & 1 deletion cmd/nvidia-dra-imex-plugin/device_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"

resourceapi "k8s.io/api/resource/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
drapbv1 "k8s.io/kubelet/pkg/apis/dra/v1beta1"
Expand All @@ -32,6 +33,10 @@ import (
configapi "github.com/NVIDIA/k8s-dra-driver/api/nvidia.com/resource/v1beta1"
)

const (
CliqueIDLabelKey = "nvidia.com/gpu.clique"
)

type OpaqueDeviceConfig struct {
Requests []string
Config runtime.Object
Expand Down Expand Up @@ -83,7 +88,13 @@ func NewDeviceState(ctx context.Context, config *Config) (*DeviceState, error) {
return nil, fmt.Errorf("unable to create CDI handler: %w", err)
}

imexDaemonSettingsManager := NewImexDaemonSettingsManager(config, ImexDaemonSettingsRoot)
node, err := config.clientsets.Core.CoreV1().Nodes().Get(ctx, config.flags.nodeName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error getting Node: %w", err)
}

cliqueID := node.Labels[CliqueIDLabelKey]
imexDaemonSettingsManager := NewImexDaemonSettingsManager(config, ImexDaemonSettingsRoot, cliqueID)

if err := cdi.CreateStandardDeviceSpecFile(allocatable); err != nil {
return nil, fmt.Errorf("unable to create base CDI spec file: %v", err)
Expand Down
7 changes: 7 additions & 0 deletions cmd/nvidia-dra-imex-plugin/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func NewDriver(ctx context.Context, config *Config) (*driver, error) {
resources.Devices = append(resources.Devices, device.GetDevice())
}

if err := state.imexDaemonSettingsManager.Start(ctx); err != nil {
return nil, err
}

if err := plugin.PublishResources(ctx, resources); err != nil {
return nil, err
}
Expand All @@ -84,6 +88,9 @@ func (d *driver) Shutdown() error {
if d == nil {
return nil
}
if err := d.state.imexDaemonSettingsManager.Stop(); err != nil {
return fmt.Errorf("error stopping ImexDaemonSettings manager: %w", err)
}
d.plugin.Stop()
return nil
}
Expand Down
122 changes: 118 additions & 4 deletions cmd/nvidia-dra-imex-plugin/imexdaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,38 @@ import (
"context"
"fmt"
"os"
"sync"
"text/template"
"time"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

cdiapi "tags.cncf.io/container-device-interface/pkg/cdi"
cdispec "tags.cncf.io/container-device-interface/specs-go"

nvapi "github.com/NVIDIA/k8s-dra-driver/api/nvidia.com/resource/v1beta1"
nvinformers "github.com/NVIDIA/k8s-dra-driver/pkg/nvidia.com/informers/externalversions"
)

const (
informerResyncPeriod = 10 * time.Minute

ImexDaemonSettingsRoot = DriverPluginPath + "/imex"
ImexDaemonConfigTemplatePath = "/templates/imex-daemon-config.tmpl.cfg"
)

type ImexDaemonSettingsManager struct {
config *Config
config *Config
waitGroup sync.WaitGroup
cancelContext context.CancelFunc

factory nvinformers.SharedInformerFactory
informer cache.SharedIndexInformer

configFilesRoot string
cliqueID string
}

type ImexDaemonSettings struct {
Expand All @@ -45,11 +63,57 @@ type ImexDaemonSettings struct {
nodesConfigPath string
}

func NewImexDaemonSettingsManager(config *Config, configFilesRoot string) *ImexDaemonSettingsManager {
return &ImexDaemonSettingsManager{
func NewImexDaemonSettingsManager(config *Config, configFilesRoot, cliqueID string) *ImexDaemonSettingsManager {
factory := nvinformers.NewSharedInformerFactory(config.clientsets.Nvidia, informerResyncPeriod)
informer := factory.Resource().V1beta1().ComputeDomains().Informer()

m := &ImexDaemonSettingsManager{
config: config,
factory: factory,
informer: informer,
configFilesRoot: configFilesRoot,
cliqueID: cliqueID,
}

return m
}

func (m *ImexDaemonSettingsManager) Start(ctx context.Context) (rerr error) {
ctx, cancel := context.WithCancel(ctx)
m.cancelContext = cancel

defer func() {
if rerr != nil {
if err := m.Stop(); err != nil {
klog.Errorf("error stopping ImexDaemonSettings manager: %v", err)
}
}
}()

err := m.informer.AddIndexers(cache.Indexers{
"computeDomainUID": uidIndexer[*nvapi.ComputeDomain],
})
if err != nil {
return fmt.Errorf("error adding indexer for UIDs: %w", err)
}

m.waitGroup.Add(1)
go func() {
defer m.waitGroup.Done()
m.factory.Start(ctx.Done())
}()

if !cache.WaitForCacheSync(ctx.Done(), m.informer.HasSynced) {
return fmt.Errorf("informer cache sync for ComputeDomains failed")
}

return nil
}

func (m *ImexDaemonSettingsManager) Stop() error {
m.cancelContext()
m.waitGroup.Wait()
return nil
}

func (m *ImexDaemonSettingsManager) NewSettings(domain string) *ImexDaemonSettings {
Expand Down Expand Up @@ -142,6 +206,56 @@ func (s *ImexDaemonSettings) WriteNodesConfigFile(ctx context.Context) error {
}

func (s *ImexDaemonSettings) GetNodeIPs(ctx context.Context) ([]string, error) {
ips := []string{"10.136.206.41", "10.136.206.42", "10.136.206.43", "10.136.206.44"}
// TODO: Move away from a retry solution and instead register a callback
// and react immediately when the desired ComputeDomain has its
// Status.Nodes field populated.
backoff := wait.Backoff{
Duration: time.Microsecond, // Initial delay
Factor: 3, // Factor to multiply duration each iteration
Jitter: 0, // Jitter factor for randomness
Steps: 16, // Maximum number of steps
Cap: 45 * time.Second, // Maximum backoff duration
}

var nodes []*nvapi.ComputeDomainNode
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
cd, err := s.GetComputeDomain(s.domain)
if err != nil {
return false, fmt.Errorf("error getting ComputeDomain: %w", err)
}
if cd == nil || cd.Status.Nodes == nil {
return false, nil
}
nodes = cd.Status.Nodes
return true, nil
})
if err != nil {
return nil, fmt.Errorf("error getting status of nodes in ComputeDomain: %w", err)
}

var ips []string
for _, node := range nodes {
if s.manager.cliqueID == node.CliqueID {
ips = append(ips, node.IPAddress)
}
}
return ips, nil
}

func (s *ImexDaemonSettings) GetComputeDomain(uid string) (*nvapi.ComputeDomain, error) {
cds, err := s.manager.informer.GetIndexer().ByIndex("computeDomainUID", uid)
if err != nil {
return nil, fmt.Errorf("error retrieving ComputeDomain by UID: %w", err)
}
if len(cds) == 0 {
return nil, nil
}
if len(cds) != 1 {
return nil, fmt.Errorf("multiple ComputeDomains with the same UID")
}
cd, ok := cds[0].(*nvapi.ComputeDomain)
if !ok {
return nil, fmt.Errorf("failed to cast to ComputeDomain")
}
return cd, nil
}
31 changes: 31 additions & 0 deletions cmd/nvidia-dra-imex-plugin/indexers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2025 NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func uidIndexer[T metav1.ObjectMetaAccessor](obj any) ([]string, error) {
d, ok := obj.(T)
if !ok {
return nil, fmt.Errorf("expected a %T but got %T", *new(T), obj)
}
return []string{string(d.GetObjectMeta().GetUID())}, nil
}

0 comments on commit 6d6e037

Please sign in to comment.