Skip to content

Commit

Permalink
Merge pull request #1 from flexaihq/feat/kube-labels
Browse files Browse the repository at this point in the history
Add Pod Labels to DCGM Exporter Metrics
  • Loading branch information
jlouazel authored Dec 16, 2024
2 parents b97b763 + ed35054 commit 3f8f85d
Show file tree
Hide file tree
Showing 10 changed files with 458 additions and 131 deletions.
8 changes: 8 additions & 0 deletions pkg/cmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const (
CLIAddress = "address"
CLICollectInterval = "collect-interval"
CLIKubernetes = "kubernetes"
CLIKubernetesEnablePodLabels = "kubernetes-enable-pod-labels"
CLIKubernetesGPUIDType = "kubernetes-gpu-id-type"
CLIUseOldNamespace = "use-old-namespace"
CLIRemoteHEInfo = "remote-hostengine-info"
Expand Down Expand Up @@ -148,6 +149,12 @@ func NewApp(buildVersion ...string) *cli.App {
Usage: "Connect to remote hostengine at <HOST>:<PORT>",
EnvVars: []string{"DCGM_REMOTE_HOSTENGINE_INFO"},
},
&cli.BoolFlag{
Name: CLIKubernetesEnablePodLabels,
Value: false,
Usage: "Enable kubernetes pod labels in metrics. This parameter is effective only when the '--kubernetes' option is set to 'true'.",
EnvVars: []string{"DCGM_EXPORTER_KUBERNETES_ENABLE_POD_LABELS"},
},
&cli.StringFlag{
Name: CLIKubernetesGPUIDType,
Value: string(dcgmexporter.GPUUID),
Expand Down Expand Up @@ -617,6 +624,7 @@ func contextToConfig(c *cli.Context) (*dcgmexporter.Config, error) {
Address: c.String(CLIAddress),
CollectInterval: c.Int(CLICollectInterval),
Kubernetes: c.Bool(CLIKubernetes),
KubernetesEnablePodLabels: c.Bool(CLIKubernetesEnablePodLabels),
KubernetesGPUIdType: dcgmexporter.KubernetesGPUIDType(c.String(CLIKubernetesGPUIDType)),
CollectDCP: true,
UseOldNamespace: c.Bool(CLIUseOldNamespace),
Expand Down
1 change: 1 addition & 0 deletions pkg/dcgmexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Config struct {
Address string
CollectInterval int
Kubernetes bool
KubernetesEnablePodLabels bool
KubernetesGPUIdType KubernetesGPUIDType
CollectDCP bool
UseOldNamespace bool
Expand Down
59 changes: 59 additions & 0 deletions pkg/dcgmexporter/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package dcgmexporter
import (
"context"
"fmt"
"maps"
"net"
"regexp"
"slices"
Expand All @@ -28,6 +29,9 @@ import (
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1alpha1"

"github.com/NVIDIA/dcgm-exporter/internal/pkg/nvmlprovider"
Expand All @@ -44,8 +48,25 @@ var (
func NewPodMapper(c *Config) (*PodMapper, error) {
logrus.Infof("Kubernetes metrics collection enabled!")

if !c.KubernetesEnablePodLabels {
return &PodMapper{
Config: c,
}, nil
}

clusterConfig, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("failed to get in-cluster config: %w", err)
}

clientset, err := kubernetes.NewForConfig(clusterConfig)
if err != nil {
return nil, fmt.Errorf("failed to get clientset: %w", err)
}

return &PodMapper{
Config: c,
Client: clientset,
}, nil
}

Expand Down Expand Up @@ -97,6 +118,8 @@ func (p *PodMapper) Process(metrics MetricsByCounter, sysInfo SystemInfo) error
metrics[counter][j].Attributes[oldNamespaceAttribute] = podInfo.Namespace
metrics[counter][j].Attributes[oldContainerAttribute] = podInfo.Container
}

maps.Copy(metrics[counter][j].Labels, podInfo.Labels)
}
}
}
Expand Down Expand Up @@ -155,10 +178,23 @@ func (p *PodMapper) toDeviceToPod(
}
}

labels := map[string]string{}
if p.Config.KubernetesEnablePodLabels {
if podLabels, err := p.getPodLabels(pod.GetNamespace(), pod.GetName()); err != nil {
logrus.WithFields(logrus.Fields{
"pod": pod.GetName(),
"namespace": pod.GetNamespace(),
}).Warnf("Couldn't get labels: %v", err)
} else {
labels = podLabels
}
}

podInfo := PodInfo{
Name: pod.GetName(),
Namespace: pod.GetNamespace(),
Container: container.GetName(),
Labels: labels,
}

for _, deviceID := range device.GetDeviceIds() {
Expand Down Expand Up @@ -199,3 +235,26 @@ func (p *PodMapper) toDeviceToPod(

return deviceToPodMap
}

func (p *PodMapper) getPodLabels(namespace, podName string) (map[string]string, error) {
if p.Client == nil {
return nil, fmt.Errorf("kubernetes client is not initialized")
}

ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
defer cancel()

pod, err := p.Client.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return nil, err
}

// Sanitize label names
sanitizedLabels := make((map[string]string), len(pod.Labels))
for k, v := range pod.Labels {
sanitizedKey := SanitizeLabelName(k)
sanitizedLabels[sanitizedKey] = v
}

return sanitizedLabels, nil
}
76 changes: 76 additions & 0 deletions pkg/dcgmexporter/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1alpha1"

"github.com/NVIDIA/dcgm-exporter/internal/pkg/nvmlprovider"
Expand Down Expand Up @@ -335,3 +339,75 @@ func TestProcessPodMapper_WithD_Different_Format_Of_DeviceID(t *testing.T) {
})
}
}

func TestProcessPodMapper_WithLabels(t *testing.T) {
pods := []struct {
name string
labels map[string]string
}{
{"gpu-pod-0", map[string]string{"valid_label_key": "label-value"}},
{"gpu-pod-1", map[string]string{"invalid.label/key": "another-value"}},
}

objects := make([]runtime.Object, len(pods))
for i, pod := range pods {
objects[i] = &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: pod.name,
Namespace: "default",
Labels: pod.labels,
},
}
}
clientset := fake.NewSimpleClientset(objects...)

tmpDir, cleanup := CreateTmpDir(t)
defer cleanup()
socketPath := tmpDir + "/kubelet.sock"

server := grpc.NewServer()
gpus := []string{"gpu-uuid-0", "gpu-uuid-1"}
podresourcesapi.RegisterPodResourcesListerServer(server, NewPodResourcesMockServer(nvidiaResourceName, gpus))
cleanupServer := StartMockServer(t, server, socketPath)
defer cleanupServer()

podMapper := &PodMapper{
Config: &Config{
KubernetesEnablePodLabels: true,
KubernetesGPUIdType: GPUUID,
PodResourcesKubeletSocket: socketPath,
},
Client: clientset,
}

metrics := MetricsByCounter{}
counter := Counter{
FieldID: 155,
FieldName: "DCGM_FI_DEV_POWER_USAGE",
PromType: "gauge",
}
for i, gpuUUID := range gpus {
metrics[counter] = append(metrics[counter], Metric{
GPU: fmt.Sprint(i),
GPUUUID: gpuUUID,
Attributes: map[string]string{},
Labels: map[string]string{},
})
}

var sysInfo SystemInfo
err := podMapper.Process(metrics, sysInfo)
require.NoError(t, err)

for i, metric := range metrics[counter] {
pod := pods[i]
for key, value := range pod.labels {
sanitizedKey := SanitizeLabelName(key)

require.Contains(t, metric.Labels, sanitizedKey,
"Expected sanitized key '%s' to exist", sanitizedKey)
require.Equal(t, value, metric.Labels[sanitizedKey],
"Expected sanitized key '%s' to map to value '%s'", sanitizedKey, value)
}
}
}
3 changes: 3 additions & 0 deletions pkg/dcgmexporter/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/NVIDIA/go-dcgm/pkg/dcgm"
"github.com/prometheus/exporter-toolkit/web"
"k8s.io/client-go/kubernetes"
)

var (
Expand Down Expand Up @@ -142,12 +143,14 @@ type MetricsServer struct {

type PodMapper struct {
Config *Config
Client kubernetes.Interface
}

type PodInfo struct {
Name string
Namespace string
Container string
Labels map[string]string
}

// MetricsByCounter represents a map where each Counter is associated with a slice of Metric objects
Expand Down
7 changes: 7 additions & 0 deletions pkg/dcgmexporter/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ import (
"bytes"
"encoding/gob"
"fmt"
"regexp"
"sync"
"time"
)

var invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)

func WaitWithTimeout(wg *sync.WaitGroup, timeout time.Duration) error {
c := make(chan struct{})
go func() {
Expand Down Expand Up @@ -63,3 +66,7 @@ func deepCopy[T any](src T) (dst T, err error) {

return dst, nil
}

func SanitizeLabelName(s string) string {
return invalidLabelCharRE.ReplaceAllString(s, "_")
}
23 changes: 23 additions & 0 deletions pkg/dcgmexporter/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,26 @@ func TestDeepCopy(t *testing.T) {
assert.Error(t, err)
})
}

func TestSanitizeLabelName(t *testing.T) {
t.Run("Sanitize label with invalid characters", func(t *testing.T) {
input := "label.with.dots/and-slashes"
expected := "label_with_dots_and_slashes"
got := SanitizeLabelName(input)
assert.Equal(t, expected, got)
})

t.Run("Sanitize label with special characters", func(t *testing.T) {
input := "label@with#special!chars"
expected := "label_with_special_chars"
got := SanitizeLabelName(input)
assert.Equal(t, expected, got)
})

t.Run("Keep valid label unchanged", func(t *testing.T) {
input := "valid_label_name"
expected := "valid_label_name"
got := SanitizeLabelName(input)
assert.Equal(t, expected, got)
})
}
Loading

0 comments on commit 3f8f85d

Please sign in to comment.