Skip to content

Commit

Permalink
Refactor the IMEX controller code
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Klues <[email protected]>
  • Loading branch information
klueska committed Oct 25, 2024
1 parent b80afe7 commit 8424b28
Showing 1 changed file with 117 additions and 76 deletions.
193 changes: 117 additions & 76 deletions cmd/nvidia-dra-controller/imex.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,25 @@ const (
ImexDomainLabel = "nvidia.com/gpu.imex-domain"
ResourceSliceImexChannelLimit = 128
DriverImexChannelLimit = 2048
RetryTimeout = 1 * time.Minute
)

type ImexManager struct {
waitGroup sync.WaitGroup
clientset kubernetes.Interface
}

// imexDomainOffsets represents the offset for assigning IMEX channels
// to ResourceSlices for each <imex-domain, cliqueid> combination.
type imexDomainOffsets map[string]map[string]int

type ImexManager struct {
driverName string
resourceSliceImexChannelLimit int
driverImexChannelLimit int
retryTimeout time.Duration
waitGroup sync.WaitGroup
clientset kubernetes.Interface
imexDomainOffsets imexDomainOffsets
owner resourceslice.Owner
driverResources *resourceslice.DriverResources
}

func StartIMEXManager(ctx context.Context, config *Config) (*ImexManager, error) {
// Build a client set config
csconfig, err := config.flags.kubeClientConfig.NewClientSetConfig()
Expand Down Expand Up @@ -79,21 +87,25 @@ func StartIMEXManager(ctx context.Context, config *Config) (*ImexManager, error)
UID: pod.UID,
}

// Create the manager itself
m := &ImexManager{
clientset: clientset,
// Create a new set of DriverResources
driverResources := &resourceslice.DriverResources{
Pools: make(map[string]resourceslice.Pool),
}

// Stream added/removed IMEX domains from nodes over time
klog.Info("Start streaming IMEX domains from nodes...")
addedDomainsCh, removedDomainsCh, err := m.streamImexDomains(ctx)
if err != nil {
return nil, fmt.Errorf("error streaming IMEX domains: %w", err)
// Create the manager itself
m := &ImexManager{
driverName: DriverName,
resourceSliceImexChannelLimit: ResourceSliceImexChannelLimit,
driverImexChannelLimit: DriverImexChannelLimit,
retryTimeout: RetryTimeout,
clientset: clientset,
owner: owner,
driverResources: driverResources,
imexDomainOffsets: make(imexDomainOffsets),
}

// Add/Remove resource slices from IMEX domains as they come and go
klog.Info("Start publishing IMEX channels to ResourceSlices...")
err = m.manageResourceSlices(ctx, owner, addedDomainsCh, removedDomainsCh)
err = m.manageResourceSlices(ctx)
if err != nil {
return nil, fmt.Errorf("error managing resource slices: %w", err)
}
Expand All @@ -102,35 +114,44 @@ func StartIMEXManager(ctx context.Context, config *Config) (*ImexManager, error)
}

// manageResourceSlices reacts to added and removed IMEX domains and triggers the creation / removal of resource slices accordingly.
func (m *ImexManager) manageResourceSlices(ctx context.Context, owner resourceslice.Owner, addedDomainsCh <-chan string, removedDomainsCh <-chan string) error {
driverResources := &resourceslice.DriverResources{}
controller, err := resourceslice.StartController(ctx, m.clientset, DriverName, owner, driverResources)
func (m *ImexManager) manageResourceSlices(ctx context.Context) error {
klog.Info("Start streaming IMEX domains from nodes...")
addedDomainsCh, removedDomainsCh, err := m.streamImexDomains(ctx)
if err != nil {
return fmt.Errorf("error streaming IMEX domains: %w", err)
}

klog.Info("Start publishing IMEX channels to ResourceSlices...")
controller, err := resourceslice.StartController(ctx, m.clientset, m.driverName, m.owner, m.driverResources)
if err != nil {
return fmt.Errorf("error starting resource slice controller: %w", err)
}

imexDomainOffsets := new(imexDomainOffsets)
m.waitGroup.Add(1)
go func() {
defer m.waitGroup.Done()
for {
select {
case addedDomain := <-addedDomainsCh:
offset, err := imexDomainOffsets.add(addedDomain, ResourceSliceImexChannelLimit, DriverImexChannelLimit)
if err != nil {
klog.Errorf("Error calculating channel offset for IMEX domain %s: %v", addedDomain, err)
return
}
klog.Infof("Adding channels for new IMEX domain: %v", addedDomain)
driverResources := driverResources.DeepCopy()
driverResources.Pools[addedDomain] = generateImexChannelPool(addedDomain, offset, ResourceSliceImexChannelLimit)
controller.Update(driverResources)
if err := m.addImexDomain(addedDomain); err != nil {
klog.Errorf("Error adding channels for IMEX domain %s: %v", addedDomain, err)
go func() {
time.Sleep(m.retryTimeout)
addedDomainsCh <- addedDomain
}()
}
controller.Update(m.driverResources)
case removedDomain := <-removedDomainsCh:
klog.Infof("Removing channels for removed IMEX domain: %v", removedDomain)
driverResources := driverResources.DeepCopy()
delete(driverResources.Pools, removedDomain)
imexDomainOffsets.remove(removedDomain)
controller.Update(driverResources)
if err := m.removeImexDomain(removedDomain); err != nil {
klog.Errorf("Error removing channels for IMEX domain %s: %v", removedDomain, err)
go func() {
time.Sleep(m.retryTimeout)
removedDomainsCh <- removedDomain
}()
}
controller.Update(m.driverResources)
case <-ctx.Done():
return
}
Expand All @@ -155,8 +176,27 @@ func (m *ImexManager) Stop() error {
return nil
}

// addImexDomain adds an IMEX domain to be managed by the ImexManager.
func (m *ImexManager) addImexDomain(imexDomain string) error {
offset, err := m.imexDomainOffsets.add(imexDomain, m.resourceSliceImexChannelLimit, m.driverImexChannelLimit)
if err != nil {
return fmt.Errorf("error setting offset for IMEX channels: %w", err)
}
m.driverResources = m.driverResources.DeepCopy()
m.driverResources.Pools[imexDomain] = generateImexChannelPool(imexDomain, offset, m.resourceSliceImexChannelLimit)
return nil
}

// removeImexDomain removes an IMEX domain from being managed by the ImexManager.
func (m *ImexManager) removeImexDomain(imexDomain string) error {
m.imexDomainOffsets.remove(imexDomain)
m.driverResources = m.driverResources.DeepCopy()
delete(m.driverResources.Pools, imexDomain)
return nil
}

// streamImexDomains returns two channels that streams imexDomans that are added and removed from nodes over time.
func (m *ImexManager) streamImexDomains(ctx context.Context) (<-chan string, <-chan string, error) {
func (m *ImexManager) streamImexDomains(ctx context.Context) (chan string, chan string, error) {
// Create channels to stream IMEX domain ids that are added / removed
addedDomainCh := make(chan string)
removedDomainCh := make(chan string)
Expand Down Expand Up @@ -246,50 +286,6 @@ func (m *ImexManager) streamImexDomains(ctx context.Context) (<-chan string, <-c
return addedDomainCh, removedDomainCh, nil
}

// generateImexChannelPool generates the contents of a ResourceSlice pool for a given IMEX domain.
func generateImexChannelPool(imexDomain string, startChannel int, numChannels int) resourceslice.Pool {
// Generate channels from startChannel to offset+numChannels
var devices []resourceapi.Device
for i := startChannel; i < (startChannel + numChannels); i++ {
d := resourceapi.Device{
Name: fmt.Sprintf("imex-channel-%d", i),
Basic: &resourceapi.BasicDevice{
Attributes: map[resourceapi.QualifiedName]resourceapi.DeviceAttribute{
"type": {
StringValue: ptr.To("imex-channel"),
},
"channel": {
IntValue: ptr.To(int64(i)),
},
},
},
}
devices = append(devices, d)
}

// Put them in a pool named after the IMEX domain with the IMEX domain label as a node selector
pool := resourceslice.Pool{
NodeSelector: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: ImexDomainLabel,
Operator: v1.NodeSelectorOpIn,
Values: []string{
imexDomain,
},
},
},
},
},
},
Devices: devices,
}

return pool
}

// cleanupResourceSlices removes all resource slices created by the IMEX manager.
func (m *ImexManager) cleanupResourceSlices() error {
// Delete all resource slices created by the IMEX manager
Expand Down Expand Up @@ -354,6 +350,7 @@ func (offsets imexDomainOffsets) add(imexDomain string, resourceSliceImexChannel
return offset, nil
}

// remove removes the offset where an IMEX domain's channels should start counting from.
func (offsets imexDomainOffsets) remove(imexDomain string) {
id := strings.SplitN(imexDomain, ".", 2)
if len(id) != 2 {
Expand All @@ -367,3 +364,47 @@ func (offsets imexDomainOffsets) remove(imexDomain string) {
delete(offsets, imexDomain)
}
}

// generateImexChannelPool generates the contents of a ResourceSlice pool for a given IMEX domain.
func generateImexChannelPool(imexDomain string, startChannel int, numChannels int) resourceslice.Pool {
// Generate channels from startChannel to offset+numChannels
var devices []resourceapi.Device
for i := startChannel; i < (startChannel + numChannels); i++ {
d := resourceapi.Device{
Name: fmt.Sprintf("imex-channel-%d", i),
Basic: &resourceapi.BasicDevice{
Attributes: map[resourceapi.QualifiedName]resourceapi.DeviceAttribute{
"type": {
StringValue: ptr.To("imex-channel"),
},
"channel": {
IntValue: ptr.To(int64(i)),
},
},
},
}
devices = append(devices, d)
}

// Put them in a pool named after the IMEX domain with the IMEX domain label as a node selector
pool := resourceslice.Pool{
NodeSelector: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: ImexDomainLabel,
Operator: v1.NodeSelectorOpIn,
Values: []string{
imexDomain,
},
},
},
},
},
},
Devices: devices,
}

return pool
}

0 comments on commit 8424b28

Please sign in to comment.