Skip to content

Commit

Permalink
Add pod owner attributes for RDMA metrics
Browse files Browse the repository at this point in the history
Signed-off-by: lou-lan <[email protected]>
  • Loading branch information
lou-lan committed Nov 21, 2024
1 parent 335952e commit dd6ab57
Show file tree
Hide file tree
Showing 19 changed files with 1,382 additions and 30 deletions.
916 changes: 916 additions & 0 deletions charts/spiderpool/files/grafana-rdma-workload.json

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions charts/spiderpool/templates/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,14 @@ rules:
- patch
- update
- watch
---
# for rdma metrics exporter, read rdma pod owner's info
# for example, the rdma pod owner is a job, the job's owner is a cronjob
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: spiderpool-read-all-resources
rules:
- apiGroups: ["*"]
resources: ["*"]
verbs: ["get", "list", "watch"]
15 changes: 14 additions & 1 deletion charts/spiderpool/templates/role_binding.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,17 @@ subjects:
name: {{ .Values.spiderpoolInit.name | trunc 63 | trimSuffix "-" }}
namespace: {{ .Release.Namespace }}
{{- end }}
{{- end }}
{{- end }}
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: spiderpool-read-all-resources
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: spiderpool-read-all-resources
subjects:
- kind: ServiceAccount
name: {{ .Values.spiderpoolAgent.name | trunc 63 | trimSuffix "-" }}
namespace: {{ .Release.Namespace }}
4 changes: 4 additions & 0 deletions cmd/spiderpool-agent/cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/spf13/pflag"
"go.uber.org/atomic"
"gopkg.in/yaml.v3"
"k8s.io/client-go/kubernetes"
ctrl "sigs.k8s.io/controller-runtime"

"github.com/spidernet-io/spiderpool/api/v1/agent/client"
Expand Down Expand Up @@ -121,6 +122,9 @@ type AgentContext struct {
SubnetManager subnetmanager.SubnetManager
KubevirtManager kubevirtmanager.KubevirtManager

// k8s client
ClientSet *kubernetes.Clientset

// handler
HttpServer *server.Server
UnixServer *server.Server
Expand Down
18 changes: 18 additions & 0 deletions cmd/spiderpool-agent/cmd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.uber.org/automaxprocs/maxprocs"
"go.uber.org/zap"
apiruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -147,6 +148,13 @@ func DaemonMain() {
}
agentContext.CRDManager = mgr

logger.Info("Begin to initialize k8s clientSet")
clientSet, err := initK8sClientSet()
if nil != err {
logger.Fatal(err.Error())
}
agentContext.ClientSet = clientSet

logger.Info("Begin to initialize spiderpool-agent metrics HTTP server")
initAgentMetricsServer(agentContext.InnerCtx)

Expand Down Expand Up @@ -325,6 +333,16 @@ func waitAPIServerReady(ctx context.Context) error {
return errors.New("failed to talk to API Server")
}

// initK8sClientSet will new kubernetes ClientSet
func initK8sClientSet() (*kubernetes.Clientset, error) {
clientSet, err := kubernetes.NewForConfig(ctrl.GetConfigOrDie())
if nil != err {
return nil, fmt.Errorf("failed to init K8s clientset: %v", err)
}

return clientSet, nil
}

func initAgentServiceManagers(ctx context.Context) {
logger.Debug("Begin to initialize Node manager")
nodeManager, err := nodemanager.NewNodeManager(
Expand Down
17 changes: 16 additions & 1 deletion cmd/spiderpool-agent/cmd/metrics_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (
"fmt"
"net/http"

"k8s.io/client-go/informers"

"github.com/spidernet-io/spiderpool/pkg/constant"
"github.com/spidernet-io/spiderpool/pkg/metric"
"github.com/spidernet-io/spiderpool/pkg/podownercache"
)

// initAgentMetricsServer will start an opentelemetry http server for spiderpool agent.
Expand All @@ -23,7 +26,19 @@ func initAgentMetricsServer(ctx context.Context) {
logger.Fatal(err.Error())
}

err = metric.InitSpiderpoolAgentMetrics(ctx, agentContext.Cfg.EnableRDMAMetric, agentContext.CRDManager.GetClient())
var cache podownercache.CacheInterface
if agentContext.Cfg.EnableRDMAMetric { //nolint:golint
informerFactory := informers.NewSharedInformerFactory(agentContext.ClientSet, 0)
podInformer := informerFactory.Core().V1().Pods().Informer()
informerFactory.Start(ctx.Done())

cache, err = podownercache.New(ctx, podInformer, agentContext.CRDManager.GetClient())
if err != nil {
logger.Fatal(err.Error())
}
}

err = metric.InitSpiderpoolAgentMetrics(ctx, cache)
if nil != err {
logger.Fatal(err.Error())
}
Expand Down
Binary file added docs/images/rdma/rdma-cluster.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/rdma/rdma-node1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/rdma/rdma-node2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/rdma/rdma-pod.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/rdma/rdma-workload.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
16 changes: 16 additions & 0 deletions docs/usage/rdma-metrics-zh_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,19 @@ helm upgrade --install spiderpool spiderpool/spiderpool --reuse-values --wait --
| rx_vport_rdma_multicast_bytes | Counter | 接收到的多播 RDMA 包的字节数 | |
| tx_vport_rdma_multicast_bytes | Counter | 发送的多播 RDMA 包的字节数 | |
| vport_speed_mbps | 速度 | 端口的速度,以兆位每秒(Mbps)表示 | |

## Grafana 监控面板

Grafana RDMA Cluster 监控面板,可以查看当前集群每个节点的 RDMA 监控。
![RDMA Dashboard](../images/rdma/rdma-cluster.png)

Grafana RDMA Node 监控面板,可以查看每个物理网卡的 RDMA 监控,以及该物理网卡的带宽利用率。同时提供了该节点宿主机的 vf 网卡统计,以及该节点 Pod 使用 RDMA 网卡的监控。
![RDMA Dashboard](../images/rdma/rdma-node1.png)
<br>
![RDMA Dashboard](../images/rdma/rdma-node2.png)

Grafana RDMA Pod 监控面板,可以查看 Pod 里面每张网卡的 RDMA 监控,同时提供了网卡错误统计信息,根据这些信息的统计可以排查问题。
![RDMA Dashboard](../images/rdma/rdma-pod.png)

Grafana RDMA Workload 监控面板。在进行 AI 推理和训练时,往往使用 Job, Deployment, KServer 等顶层资源下发 CR 启动一组 Pod 进行训练,可以查看每个顶层资源的 RDMA 监控。
![RDMA Dashboard](../images/rdma/rdma-workload.png)
16 changes: 16 additions & 0 deletions docs/usage/rdma-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,19 @@ Below is a table containing "Metric Name," "Metric Type," "Metric Meaning," and
| rx_vport_rdma_multicast_bytes | Counter | Number of bytes received in multicast RDMA packets | |
| tx_vport_rdma_multicast_bytes | Counter | Number of bytes transmitted in multicast RDMA packets | |
| vport_speed_mbps | Speed | Speed of the port in Mbps | |

## Grafana Monitoring Dashboard

The Grafana RDMA Cluster monitoring dashboard provides a view of the RDMA metrics for each node in the current cluster.
![RDMA Dashboard](../images/rdma/rdma-cluster.png)

The Grafana RDMA Node monitoring dashboard displays RDMA metrics for each physical NIC (Network Interface Card) and the bandwidth utilization of those NICs. It also includes statistics for VF NICs on the host node and monitoring metrics for Pods using RDMA NICs on that node.
![RDMA Dashboard](../images/rdma/rdma-node1.png)
<br>
![RDMA Dashboard](../images/rdma/rdma-node2.png)

The Grafana RDMA Pod monitoring dashboard provides RDMA metrics for each NIC within a Pod, along with NIC error statistics. These metrics help in troubleshooting issues.
![RDMA Dashboard](../images/rdma/rdma-pod.png)

The Grafana RDMA Workload monitoring dashboard is designed for monitoring RDMA metrics for top-level resources such as Jobs, Deployments, and KServers. These resources typically initiate a set of Pods for AI inference and training tasks.
![RDMA Dashboard](../images/rdma/rdma-workload.png)
2 changes: 1 addition & 1 deletion pkg/ippoolmanager/ippool_manager_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ var _ = BeforeSuite(func() {
Build()
_, err = metric.InitMetric(context.TODO(), constant.SpiderpoolAgent, false, false)
Expect(err).NotTo(HaveOccurred())
err = metric.InitSpiderpoolAgentMetrics(context.TODO(), false, fakeClient)
err = metric.InitSpiderpoolAgentMetrics(context.TODO(), nil)
Expect(err).NotTo(HaveOccurred())

tracker = k8stesting.NewObjectTracker(scheme, k8sscheme.Codecs.UniversalDecoder())
Expand Down
8 changes: 4 additions & 4 deletions pkg/metric/metrics_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (

"go.opentelemetry.io/otel/attribute"
api "go.opentelemetry.io/otel/metric"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/spidernet-io/spiderpool/pkg/lock"
"github.com/spidernet-io/spiderpool/pkg/podownercache"
"github.com/spidernet-io/spiderpool/pkg/rdmametrics"
)

Expand Down Expand Up @@ -221,10 +221,10 @@ func (a *asyncInt64Gauge) Record(value int64, attrs ...attribute.KeyValue) {
}

// InitSpiderpoolAgentMetrics serves for spiderpool agent metrics initialization
func InitSpiderpoolAgentMetrics(ctx context.Context, enableRDMAMetric bool, client client.Client) error {
func InitSpiderpoolAgentMetrics(ctx context.Context, cache podownercache.CacheInterface) error {
// for rdma
if enableRDMAMetric {
err := rdmametrics.Register(ctx, meter, client)
if cache != nil {
err := rdmametrics.Register(ctx, meter, cache)
if err != nil {
return err
}
Expand Down
172 changes: 172 additions & 0 deletions pkg/podownercache/pod_owner_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Copyright 2024 Authors of spidernet-io
// SPDX-License-Identifier: Apache-2.0

package podownercache

import (
"context"
"fmt"
"github.com/spidernet-io/spiderpool/pkg/lock"
"github.com/spidernet-io/spiderpool/pkg/logutils"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type PodOwnerCache struct {
ctx context.Context
apiReader client.Reader

cacheLock lock.RWMutex
pods map[types.NamespacedName]Pod
ipToPod map[string]types.NamespacedName
}

type Pod struct {
types.NamespacedName
OwnerInfo OwnerInfo
IPs []string
}

type OwnerInfo struct {
APIVersion string
Kind string
Namespace string
Name string
}

type CacheInterface interface {
GetPodByIP(ip string) *Pod
}

var logger *zap.Logger

func New(ctx context.Context, podInformer cache.SharedIndexInformer, apiReader client.Reader) (CacheInterface, error) {
logger = logutils.Logger.Named("PodOwnerCache")
logger.Info("create PodOwnerCache informer")

res := &PodOwnerCache{
ctx: ctx,
apiReader: apiReader,
cacheLock: lock.RWMutex{},
pods: make(map[types.NamespacedName]Pod),
ipToPod: make(map[string]types.NamespacedName),
}

_, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: res.onPodAdd,
UpdateFunc: res.onPodUpdate,
DeleteFunc: res.onPodDel,
})
if nil != err {
logger.Error(err.Error())
return nil, err
}

Check warning on line 68 in pkg/podownercache/pod_owner_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/podownercache/pod_owner_cache.go#L66-L68

Added lines #L66 - L68 were not covered by tests

return res, nil
}

func (s *PodOwnerCache) onPodAdd(obj interface{}) {
if pod, ok := obj.(*corev1.Pod); ok {
if pod.Spec.HostNetwork {
return
}

Check warning on line 77 in pkg/podownercache/pod_owner_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/podownercache/pod_owner_cache.go#L76-L77

Added lines #L76 - L77 were not covered by tests
if len(pod.Status.PodIPs) > 0 {
ips := make([]string, 0, len(pod.Status.PodIPs))
for _, p := range pod.Status.PodIPs {
ips = append(ips, p.IP)
}
owner, err := s.getFinalOwner(pod)
if err != nil {
logger.Warn("", zap.Error(err))
return
}

Check warning on line 87 in pkg/podownercache/pod_owner_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/podownercache/pod_owner_cache.go#L85-L87

Added lines #L85 - L87 were not covered by tests
s.cacheLock.Lock()
defer s.cacheLock.Unlock()
key := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
s.pods[key] = Pod{
NamespacedName: key,
OwnerInfo: *owner,
IPs: ips,
}
for _, ip := range ips {
s.ipToPod[ip] = key
}
}
}
}

func (s *PodOwnerCache) onPodUpdate(oldObj, newObj interface{}) {
s.onPodAdd(newObj)
}

func (s *PodOwnerCache) onPodDel(obj interface{}) {
if pod, ok := obj.(*corev1.Pod); ok {
s.cacheLock.Lock()
defer s.cacheLock.Unlock()

key := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
if _, ok := s.pods[key]; !ok {
return
}

Check warning on line 115 in pkg/podownercache/pod_owner_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/podownercache/pod_owner_cache.go#L114-L115

Added lines #L114 - L115 were not covered by tests
for _, ip := range s.pods[key].IPs {
delete(s.ipToPod, ip)
}
delete(s.pods, key)
}
}

func (s *PodOwnerCache) getFinalOwner(obj metav1.Object) (*OwnerInfo, error) {
var finalOwner *OwnerInfo

for {
ownerRefs := obj.GetOwnerReferences()
if len(ownerRefs) == 0 {
break
}

ownerRef := ownerRefs[0] // Assuming the first owner reference
finalOwner = &OwnerInfo{
APIVersion: ownerRef.APIVersion,
Kind: ownerRef.Kind,
Namespace: obj.GetNamespace(),
Name: ownerRef.Name,
}

// Prepare an empty object of the owner kind
ownerObj := &unstructured.Unstructured{}
ownerObj.SetAPIVersion(ownerRef.APIVersion)
ownerObj.SetKind(ownerRef.Kind)

err := s.apiReader.Get(s.ctx, client.ObjectKey{
Namespace: obj.GetNamespace(),
Name: ownerRef.Name,
}, ownerObj)
if err != nil {
return nil, fmt.Errorf("error fetching owner: %v", err)
}

Check warning on line 151 in pkg/podownercache/pod_owner_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/podownercache/pod_owner_cache.go#L150-L151

Added lines #L150 - L151 were not covered by tests

// Set obj to the current owner to continue the loop
obj = ownerObj
}

return finalOwner, nil
}

func (s *PodOwnerCache) GetPodByIP(ip string) *Pod {
s.cacheLock.RLock()
defer s.cacheLock.RUnlock()
item, exists := s.ipToPod[ip]
if !exists {
return nil
}
pod, exists := s.pods[item]
if !exists {
return nil
}

Check warning on line 170 in pkg/podownercache/pod_owner_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/podownercache/pod_owner_cache.go#L169-L170

Added lines #L169 - L170 were not covered by tests
return &pod
}
Loading

0 comments on commit dd6ab57

Please sign in to comment.