diff --git a/Makefile b/Makefile index 8b5d58cd20..fc7b587c2a 100644 --- a/Makefile +++ b/Makefile @@ -129,7 +129,7 @@ e2e/single-az: bin/helm bin/ginkgo TEST_PATH=./tests/e2e/... \ GINKGO_FOCUS="\[ebs-csi-e2e\] \[single-az\]" \ GINKGO_PARALLEL=5 \ - HELM_EXTRA_FLAGS="--set=controller.volumeModificationFeature.enabled=true,sidecars.provisioner.additionalArgs[0]='--feature-gates=VolumeAttributesClass=true',sidecars.resizer.additionalArgs[0]='--feature-gates=VolumeAttributesClass=true'" \ + HELM_EXTRA_FLAGS="--set=controller.volumeModificationFeature.enabled=true,sidecars.provisioner.additionalArgs[0]='--feature-gates=VolumeAttributesClass=true',sidecars.resizer.additionalArgs[0]='--feature-gates=VolumeAttributesClass=true',node.enableMetrics=true" \ ./hack/e2e/run.sh .PHONY: e2e/multi-az diff --git a/charts/aws-ebs-csi-driver/templates/_node.tpl b/charts/aws-ebs-csi-driver/templates/_node.tpl index 49464cbcba..429d48c016 100644 --- a/charts/aws-ebs-csi-driver/templates/_node.tpl +++ b/charts/aws-ebs-csi-driver/templates/_node.tpl @@ -70,6 +70,12 @@ spec: {{- with .Values.node.reservedVolumeAttachments }} - --reserved-volume-attachments={{ . }} {{- end }} + {{- if .Values.node.enableMetrics }} + - --http-endpoint=0.0.0.0:3302 + {{- end}} + {{- with .Values.node.kubeletPath }} + - --csi-mount-point-prefix={{ . }} + {{- end}} {{- with .Values.node.volumeAttachLimit }} - --volume-attach-limit={{ . }} {{- end }} diff --git a/charts/aws-ebs-csi-driver/templates/metrics.yaml b/charts/aws-ebs-csi-driver/templates/metrics.yaml index 9792af4e60..0baea47681 100644 --- a/charts/aws-ebs-csi-driver/templates/metrics.yaml +++ b/charts/aws-ebs-csi-driver/templates/metrics.yaml @@ -40,3 +40,21 @@ spec: interval: {{ .Values.controller.serviceMonitor.interval | default "15s"}} {{- end }} {{- end }} +--- +{{- if .Values.node.enableMetrics -}} +apiVersion: v1 +kind: Service +metadata: + name: ebs-csi-node + namespace: {{ .Release.Namespace }} + labels: + app: ebs-csi-node +spec: + selector: + app: ebs-csi-node + ports: + - name: metrics + port: 3302 + targetPort: 3302 + type: ClusterIP +{{- end }} diff --git a/cmd/main.go b/cmd/main.go index 3579f23350..9659415afe 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -134,11 +134,6 @@ func main() { }() } - if options.HTTPEndpoint != "" { - r := metrics.InitializeRecorder() - r.InitializeMetricsHandler(options.HTTPEndpoint, "/metrics", options.MetricsCertFile, options.MetricsKeyFile) - } - cfg := metadata.MetadataServiceConfig{ EC2MetadataClient: metadata.DefaultEC2MetadataClient, K8sAPIClient: metadata.DefaultKubernetesAPIClient(options.Kubeconfig), @@ -159,6 +154,15 @@ func main() { md, metadataErr = metadata.NewMetadataService(cfg, region) } + if options.HTTPEndpoint != "" { + r := metrics.InitializeRecorder() + r.InitializeMetricsHandler(options.HTTPEndpoint, "/metrics", options.MetricsCertFile, options.MetricsKeyFile) + + if options.Mode == driver.NodeMode || options.Mode == driver.AllMode { + metrics.InitializeNVME(r, options.CsiMountPointPath, md.GetInstanceID()) + } + } + if metadataErr != nil { klog.ErrorS(metadataErr, "Failed to initialize metadata when it is required") if options.Mode == driver.ControllerMode { diff --git a/deploy/kubernetes/base/node.yaml b/deploy/kubernetes/base/node.yaml index 58ad6c4b21..e72e62fe5a 100644 --- a/deploy/kubernetes/base/node.yaml +++ b/deploy/kubernetes/base/node.yaml @@ -59,6 +59,7 @@ spec: args: - node - --endpoint=$(CSI_ENDPOINT) + - --csi-mount-point-prefix=/var/lib/kubelet - --logging-format=text - --v=2 env: diff --git a/go.mod b/go.mod index 95e4f1ca7b..693888ad05 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/kubernetes-csi/external-snapshotter/client/v4 v4.2.0 github.com/onsi/ginkgo/v2 v2.21.0 github.com/onsi/gomega v1.35.0 + github.com/prometheus/client_golang v1.20.5 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0 @@ -99,7 +100,6 @@ require ( github.com/opencontainers/selinux v1.11.1 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - github.com/prometheus/client_golang v1.20.5 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.60.1 // indirect github.com/prometheus/procfs v0.15.1 // indirect diff --git a/pkg/driver/node.go b/pkg/driver/node.go index 0c4ab698e0..eccf4cf450 100644 --- a/pkg/driver/node.go +++ b/pkg/driver/node.go @@ -648,6 +648,9 @@ func (d *NodeService) nodePublishVolumeForBlock(req *csi.NodePublishVolumeReques } // Create the mount point as a file since bind mount device node requires it to be a file + // This implementation detail is relied upon by the NVMECollector, + // which discovers block devices by parsing /proc/self/mountinfo. The bind mount + // created here ensures block devices appear in mountinfo even without a filesystem. klog.V(4).InfoS("NodePublishVolume [block]: making target file", "target", target) if err = d.mounter.MakeFile(target); err != nil { if removeErr := os.Remove(target); removeErr != nil { diff --git a/pkg/driver/options.go b/pkg/driver/options.go index adf291ad5a..9e1ea4d085 100644 --- a/pkg/driver/options.go +++ b/pkg/driver/options.go @@ -89,6 +89,8 @@ type Options struct { WindowsHostProcess bool // LegacyXFSProgs formats XFS volumes with `bigtime=0,inobtcount=0,reflink=0`, so that they can be mounted onto nodes with linux kernel ≤ v5.4. Volumes formatted with this option may experience issues after 2038, and will be unable to use some XFS features (for example, reflinks). LegacyXFSProgs bool + // CsiMountPointPath is the path where CSI volumes are expected to be mounted on the node. + CsiMountPointPath string } func (o *Options) AddFlags(f *flag.FlagSet) { @@ -118,6 +120,7 @@ func (o *Options) AddFlags(f *flag.FlagSet) { f.IntVar(&o.ReservedVolumeAttachments, "reserved-volume-attachments", -1, "Number of volume attachments reserved for system use. Not used when --volume-attach-limit is specified. The total amount of volume attachments for a node is computed as: - - . When -1, the amount of reserved attachments is loaded from instance metadata that captured state at node boot and may include not only system disks but also CSI volumes.") f.BoolVar(&o.WindowsHostProcess, "windows-host-process", false, "ALPHA: Indicates whether the driver is running in a Windows privileged container") f.BoolVar(&o.LegacyXFSProgs, "legacy-xfs", false, "Warning: This option will be removed in a future version of EBS CSI Driver. Formats XFS volumes with `bigtime=0,inobtcount=0,reflink=0`, so that they can be mounted onto nodes with linux kernel ≤ v5.4. Volumes formatted with this option may experience issues after 2038, and will be unable to use some XFS features (for example, reflinks).") + f.StringVar(&o.CsiMountPointPath, "csi-mount-point-prefix", "", "A prefix of the mountpoints of all CSI-managed volumes. If this value is non-empty, all volumes mounted to a path beginning with the provided value are assumed to be CSI volumes owned by the EBS CSI Driver and safe to treat as such (for example, by exposing volume metrics).") } } diff --git a/pkg/driver/options_test.go b/pkg/driver/options_test.go index 0cb8271c1f..0bea2edb81 100644 --- a/pkg/driver/options_test.go +++ b/pkg/driver/options_test.go @@ -74,6 +74,10 @@ func TestAddFlags(t *testing.T) { t.Errorf("error setting legacy-xfs: %v", err) } + if err := f.Set("csi-mount-point-prefix", "/var/lib/kubelet"); err != nil { + t.Errorf("error setting csi-mount-point-prefix: %v", err) + } + if o.Endpoint != "custom-endpoint" { t.Errorf("unexpected Endpoint: got %s, want custom-endpoint", o.Endpoint) } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 85596ccc90..64b8fd708d 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -19,7 +19,8 @@ import ( "sync" "time" - "k8s.io/component-base/metrics" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" "k8s.io/klog/v2" ) @@ -29,7 +30,7 @@ var ( ) type metricRecorder struct { - registry metrics.KubeRegistry + registry *prometheus.Registry metrics map[string]interface{} } @@ -43,13 +44,18 @@ func Recorder() *metricRecorder { func InitializeRecorder() *metricRecorder { once.Do(func() { r = &metricRecorder{ - registry: metrics.NewKubeRegistry(), + registry: prometheus.NewRegistry(), metrics: make(map[string]interface{}), } }) return r } +// InitializeNVME registers the NVMe collector for gathering metrics from NVMe devices. +func InitializeNVME(r *metricRecorder, csiMountPointPath, instanceID string) { + registerNVMECollector(r, csiMountPointPath, instanceID) +} + // IncreaseCount increases the counter metric by 1. func (m *metricRecorder) IncreaseCount(name string, labels map[string]string) { if m == nil { @@ -65,7 +71,7 @@ func (m *metricRecorder) IncreaseCount(name string, labels map[string]string) { return } - metricAsCounterVec, ok := metric.(*metrics.CounterVec) + metricAsCounterVec, ok := metric.(*prometheus.CounterVec) if ok { metricAsCounterVec.With(labels).Inc() } else { @@ -87,7 +93,7 @@ func (m *metricRecorder) ObserveHistogram(name string, value float64, labels map return } - metricAsHistogramVec, ok := metric.(*metrics.HistogramVec) + metricAsHistogramVec, ok := metric.(*prometheus.HistogramVec) if ok { metricAsHistogramVec.With(labels).Observe(value) } else { @@ -103,11 +109,7 @@ func (m *metricRecorder) InitializeMetricsHandler(address, path, certFile, keyFi } mux := http.NewServeMux() - mux.Handle(path, metrics.HandlerFor( - m.registry, - metrics.HandlerOpts{ - ErrorHandling: metrics.ContinueOnError, - })) + mux.Handle(path, promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError})) server := &http.Server{ Addr: address, @@ -136,7 +138,14 @@ func (m *metricRecorder) registerHistogramVec(name, help string, labels []string if _, exists := m.metrics[name]; exists { return } - histogram := createHistogramVec(name, help, labels, buckets) + histogram := prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: name, + Help: help, + Buckets: buckets, + }, + labels, + ) m.metrics[name] = histogram m.registry.MustRegister(histogram) } @@ -145,30 +154,15 @@ func (m *metricRecorder) registerCounterVec(name, help string, labels []string) if _, exists := m.metrics[name]; exists { return } - counter := createCounterVec(name, help, labels) - m.metrics[name] = counter - m.registry.MustRegister(counter) -} - -func createHistogramVec(name, help string, labels []string, buckets []float64) *metrics.HistogramVec { - opts := &metrics.HistogramOpts{ - Name: name, - Help: help, - StabilityLevel: metrics.ALPHA, - Buckets: buckets, - } - return metrics.NewHistogramVec(opts, labels) -} - -func createCounterVec(name, help string, labels []string) *metrics.CounterVec { - return metrics.NewCounterVec( - &metrics.CounterOpts{ - Name: name, - Help: help, - StabilityLevel: metrics.ALPHA, + counter := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: name, + Help: help, }, labels, ) + m.metrics[name] = counter + m.registry.MustRegister(counter) } func getLabelNames(labels map[string]string) []string { diff --git a/pkg/metrics/metrics_test.go b/pkg/metrics/metrics_test.go index 070b5a8302..c9a0671f9f 100644 --- a/pkg/metrics/metrics_test.go +++ b/pkg/metrics/metrics_test.go @@ -34,7 +34,7 @@ func TestMetricRecorder(t *testing.T) { m.IncreaseCount("test_total", map[string]string{"key": "value"}) }, expected: ` -# HELP test_total [ALPHA] ebs_csi_aws_com metric +# HELP test_total ebs_csi_aws_com metric # TYPE test_total counter test_total{key="value"} 1 `, @@ -46,7 +46,7 @@ test_total{key="value"} 1 m.ObserveHistogram("test", 1.5, map[string]string{"key": "value"}, []float64{1, 2, 3}) }, expected: ` -# HELP test [ALPHA] ebs_csi_aws_com metric +# HELP test ebs_csi_aws_com metric # TYPE test histogram test{key="value",le="1"} 0 test{key="value",le="2"} 1 @@ -66,7 +66,7 @@ test_count{key="value"} 1 m.IncreaseCount("test_re_register_total", map[string]string{"key": "value2"}) }, expected: ` -# HELP test_re_register_total [ALPHA] ebs_csi_aws_com metric +# HELP test_re_register_total ebs_csi_aws_com metric # TYPE test_re_register_total counter test_re_register_total{key="value1"} 2 test_re_register_total{key="value2"} 1 diff --git a/pkg/metrics/nvme.go b/pkg/metrics/nvme.go new file mode 100644 index 0000000000..515017bc69 --- /dev/null +++ b/pkg/metrics/nvme.go @@ -0,0 +1,418 @@ +//go:build linux +// +build linux + +// Copyright 2024 The Kubernetes Authors. +// +// Licensed under the Apache License, Version 2.0 (the 'License'); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an 'AS IS' BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "math" + "os" + "os/exec" + "strings" + "time" + "unsafe" + + "github.com/prometheus/client_golang/prometheus" + "golang.org/x/sys/unix" + "k8s.io/klog/v2" +) + +// EBSMetrics represents the parsed metrics from the NVMe log page. +type EBSMetrics struct { + EBSMagic uint64 + ReadOps uint64 + WriteOps uint64 + ReadBytes uint64 + WriteBytes uint64 + TotalReadTime uint64 + TotalWriteTime uint64 + EBSIOPSExceeded uint64 + EBSThroughputExceeded uint64 + EC2IOPSExceeded uint64 + EC2ThroughputExceeded uint64 + QueueLength uint64 + ReservedArea [416]byte + ReadLatency Histogram + WriteLatency Histogram +} + +type Histogram struct { + BinCount uint64 + Bins [64]HistogramBin +} + +type HistogramBin struct { + Lower uint64 + Upper uint64 + Count uint64 +} + +// As defined in . +type nvmePassthruCommand struct { + opcode uint8 + flags uint8 + rsvd1 uint16 + nsid uint32 + cdw2 uint32 + cdw3 uint32 + metadata uint64 + addr uint64 + metadataLen uint32 + dataLen uint32 + cdw10 uint32 + cdw11 uint32 + cdw12 uint32 + cdw13 uint32 + cdw14 uint32 + cdw15 uint32 + timeoutMs uint32 + result uint32 +} + +type NVMECollector struct { + metrics map[string]*prometheus.Desc + csiMountPointPath string + instanceID string + collectionDuration prometheus.Histogram + scrapesTotal prometheus.Counter + scrapeErrorsTotal prometheus.Counter +} + +var ( + ErrInvalidEBSMagic = errors.New("invalid EBS magic number") + ErrParseLogPage = errors.New("failed to parse log page") +) + +// NewNVMECollector creates a new instance of NVMECollector. +func NewNVMECollector(path, instanceID string) *NVMECollector { + variableLabels := []string{"volume_id"} + constLabels := prometheus.Labels{"instance_id": instanceID} + + return &NVMECollector{ + metrics: map[string]*prometheus.Desc{ + "total_read_ops": prometheus.NewDesc("total_read_ops", "Total number of read operations", variableLabels, constLabels), + "total_write_ops": prometheus.NewDesc("total_write_ops", "Total number of write operations", variableLabels, constLabels), + "total_read_bytes": prometheus.NewDesc("total_read_bytes", "Total number of bytes read", variableLabels, constLabels), + "total_write_bytes": prometheus.NewDesc("total_write_bytes", "Total number of bytes written", variableLabels, constLabels), + "total_read_time": prometheus.NewDesc("total_read_time", "Total time spent on read operations (in microseconds)", variableLabels, constLabels), + "total_write_time": prometheus.NewDesc("total_write_time", "Total time spent on write operations (in microseconds)", variableLabels, constLabels), + "ebs_volume_performance_exceeded_iops": prometheus.NewDesc("ebs_volume_performance_exceeded_iops", "Time EBS volume IOPS limit was exceeded (in microseconds)", variableLabels, constLabels), + "ebs_volume_performance_exceeded_tp": prometheus.NewDesc("ebs_volume_performance_exceeded_tp", "Time EBS volume throughput limit was exceeded (in microseconds)", variableLabels, constLabels), + "ec2_instance_ebs_performance_exceeded_iops": prometheus.NewDesc("ec2_instance_ebs_performance_exceeded_iops", "Time EC2 instance EBS IOPS limit was exceeded (in microseconds)", variableLabels, constLabels), + "ec2_instance_ebs_performance_exceeded_tp": prometheus.NewDesc("ec2_instance_ebs_performance_exceeded_tp", "Time EC2 instance EBS throughput limit was exceeded (in microseconds)", variableLabels, constLabels), + "volume_queue_length": prometheus.NewDesc("volume_queue_length", "Current volume queue length", variableLabels, constLabels), + "read_io_latency_histogram": prometheus.NewDesc("read_io_latency_histogram", "Histogram of read I/O latencies (in microseconds)", variableLabels, constLabels), + "write_io_latency_histogram": prometheus.NewDesc("write_io_latency_histogram", "Histogram of write I/O latencies (in microseconds)", variableLabels, constLabels), + }, + csiMountPointPath: path, + instanceID: instanceID, + collectionDuration: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "nvme_collector_duration_seconds", + Help: "Histogram of NVMe collector duration in seconds", + Buckets: []float64{0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10}, + ConstLabels: constLabels, + }), + scrapesTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "nvme_collector_scrapes_total", + Help: "Total number of NVMe collector scrapes", + ConstLabels: constLabels, + }), + scrapeErrorsTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "nvme_collector_scrape_errors_total", + Help: "Total number of NVMe collector scrape errors", + ConstLabels: constLabels, + }), + } +} + +func registerNVMECollector(r *metricRecorder, csiMountPointPath, instanceID string) { + collector := NewNVMECollector(csiMountPointPath, instanceID) + r.registry.MustRegister(collector) +} + +// Describe sends the descriptor of each metric in the NVMECollector to Prometheus. +func (c *NVMECollector) Describe(ch chan<- *prometheus.Desc) { + for _, desc := range c.metrics { + ch <- desc + } + ch <- c.collectionDuration.Desc() + ch <- c.scrapesTotal.Desc() + ch <- c.scrapeErrorsTotal.Desc() +} + +// Collect is invoked by Prometheus at collection time. +func (c *NVMECollector) Collect(ch chan<- prometheus.Metric) { + c.scrapesTotal.Inc() + start := time.Now() + defer func() { + duration := time.Since(start).Seconds() + c.collectionDuration.Observe(duration) + + ch <- c.collectionDuration + ch <- c.scrapesTotal + ch <- c.scrapeErrorsTotal + }() + + devicePaths, err := getCSIManagedDevices(c.csiMountPointPath) + if err != nil { + klog.Errorf("Error getting NVMe devices: %v", err) + c.scrapeErrorsTotal.Inc() + return + } else if len(devicePaths) == 0 { + klog.V(8).InfoS("No NVMe devices found") + return + } + + devices, err := fetchDevicePathToVolumeIDMapping(devicePaths) + if err != nil { + klog.Errorf("Error getting volume IDs: %v", err) + c.scrapeErrorsTotal.Inc() + return + } + + for devicePath, volumeID := range devices { + data, err := getNVMEMetrics(devicePath) + if err != nil { + klog.Errorf("Error collecting metrics for device %s: %v", devicePath, err) + c.scrapeErrorsTotal.Inc() + continue + } + + metrics, err := parseLogPage(data) + if err != nil { + klog.Errorf("Error parsing metrics for device %s: %v", devicePath, err) + c.scrapeErrorsTotal.Inc() + continue + } + + // Send all collected metrics to Prometheus + ch <- prometheus.MustNewConstMetric(c.metrics["total_read_ops"], prometheus.CounterValue, float64(metrics.ReadOps), volumeID) + ch <- prometheus.MustNewConstMetric(c.metrics["total_write_ops"], prometheus.CounterValue, float64(metrics.WriteOps), volumeID) + ch <- prometheus.MustNewConstMetric(c.metrics["total_read_bytes"], prometheus.CounterValue, float64(metrics.ReadBytes), volumeID) + ch <- prometheus.MustNewConstMetric(c.metrics["total_write_bytes"], prometheus.CounterValue, float64(metrics.WriteBytes), volumeID) + ch <- prometheus.MustNewConstMetric(c.metrics["total_read_time"], prometheus.CounterValue, float64(metrics.TotalReadTime), volumeID) + ch <- prometheus.MustNewConstMetric(c.metrics["total_write_time"], prometheus.CounterValue, float64(metrics.TotalWriteTime), volumeID) + ch <- prometheus.MustNewConstMetric(c.metrics["ebs_volume_performance_exceeded_iops"], prometheus.CounterValue, float64(metrics.EBSIOPSExceeded), volumeID) + ch <- prometheus.MustNewConstMetric(c.metrics["ebs_volume_performance_exceeded_tp"], prometheus.CounterValue, float64(metrics.EBSThroughputExceeded), volumeID) + ch <- prometheus.MustNewConstMetric(c.metrics["ec2_instance_ebs_performance_exceeded_iops"], prometheus.CounterValue, float64(metrics.EC2IOPSExceeded), volumeID) + ch <- prometheus.MustNewConstMetric(c.metrics["ec2_instance_ebs_performance_exceeded_tp"], prometheus.CounterValue, float64(metrics.EC2ThroughputExceeded), volumeID) + ch <- prometheus.MustNewConstMetric(c.metrics["volume_queue_length"], prometheus.GaugeValue, float64(metrics.QueueLength), volumeID) + + // Read Latency Histogram + readCount, readBuckets := convertHistogram(metrics.ReadLatency) + ch <- prometheus.MustNewConstHistogram( + c.metrics["read_io_latency_histogram"], + readCount, + 0, + readBuckets, + volumeID, + ) + + // Write Latency Histogram + writeCount, writeBuckets := convertHistogram(metrics.WriteLatency) + ch <- prometheus.MustNewConstHistogram( + c.metrics["write_io_latency_histogram"], + writeCount, + 0, + writeBuckets, + volumeID, + ) + } +} + +// convertHistogram converts the Histogram structure to a format suitable for Prometheus histogram metrics. +func convertHistogram(hist Histogram) (uint64, map[float64]uint64) { + var count uint64 + buckets := make(map[float64]uint64) + + for i := uint64(0); i < hist.BinCount && i < 64; i++ { + count += hist.Bins[i].Count + buckets[float64(hist.Bins[i].Upper)] = count + } + + return count, buckets +} + +// getNVMEMetrics retrieves NVMe metrics by reading the log page from the NVMe device at the given path. +func getNVMEMetrics(devicePath string) ([]byte, error) { + f, err := os.OpenFile(devicePath, os.O_RDWR, 0) + if err != nil { + return nil, fmt.Errorf("getNVMEMetrics: error opening device: %w", err) + } + defer f.Close() + + data, err := nvmeReadLogPage(f.Fd(), 0xD0) + if err != nil { + return nil, fmt.Errorf("getNVMEMetrics: error reading log page %w", err) + } + + return data, nil +} + +// nvmeReadLogPage reads an NVMe log page via an ioctl system call. +func nvmeReadLogPage(fd uintptr, logID uint8) ([]byte, error) { + data := make([]byte, 4096) // 4096 bytes is the length of the log page. + bufferLen := len(data) + + if bufferLen > math.MaxUint32 { + return nil, errors.New("nvmeReadLogPage: bufferLen exceeds MaxUint32") + } + + cmd := nvmePassthruCommand{ + opcode: 0x02, + addr: uint64(uintptr(unsafe.Pointer(&data[0]))), + nsid: 1, + dataLen: uint32(bufferLen), + cdw10: uint32(logID) | (1024 << 16), + } + + status, _, errno := unix.Syscall(unix.SYS_IOCTL, fd, 0xC0484E41, uintptr(unsafe.Pointer(&cmd))) + if errno != 0 { + return nil, fmt.Errorf("nvmeReadLogPage: ioctl error %w", errno) + } + if status != 0 { + return nil, fmt.Errorf("nvmeReadLogPage: ioctl command failed with status %d", status) + } + return data, nil +} + +// parseLogPage parses the binary data from an EBS log page into EBSMetrics. +func parseLogPage(data []byte) (EBSMetrics, error) { + var metrics EBSMetrics + reader := bytes.NewReader(data) + + if err := binary.Read(reader, binary.LittleEndian, &metrics); err != nil { + return EBSMetrics{}, fmt.Errorf("%w: %w", ErrParseLogPage, err) + } + + if metrics.EBSMagic != 0x3C23B510 { + return EBSMetrics{}, fmt.Errorf("%w: %x", ErrInvalidEBSMagic, metrics.EBSMagic) + } + + return metrics, nil +} + +// getCSIManagedDevices returns a slice of unique device paths for NVMe devices mounted under the given path. +func getCSIManagedDevices(path string) ([]string, error) { + if len(path) == 0 { + klog.V(4).InfoS("getCSIManagedDevices: empty path provided, no devices will be matched") + return []string{}, nil + } + + deviceMap := make(map[string]bool) + + // Read /proc/self/mountinfo to identify NVMe devices + mountinfo, err := os.ReadFile("/proc/self/mountinfo") + if err != nil { + return nil, fmt.Errorf("getCSIManagedDevices: error reading mountinfo: %w", err) + } + + lines := strings.Split(string(mountinfo), "\n") + for _, line := range lines { + fields := strings.Fields(line) + + // https://man7.org/linux/man-pages/man5/proc.5.html + if len(fields) < 10 { + continue // Skip lines with insufficient fields + } + + mountPoint := fields[4] + if !strings.HasPrefix(mountPoint, path) { + continue + } + + // Check mount source (field 3) for directly mounted NVMe devices + m := fields[3] + if strings.HasPrefix(m, "/nvme") { + device := "/dev" + m + deviceMap[device] = true + } + + // Check root (field 9) for block devices + r := fields[9] + if strings.HasPrefix(r, "/dev/nvme") { + deviceMap[r] = true + } + } + + devices := make([]string, 0, len(deviceMap)) + for device := range deviceMap { + devices = append(devices, device) + } + + return devices, nil +} + +type BlockDevice struct { + Name string `json:"name"` + Serial string `json:"serial"` +} + +type LsblkOutput struct { + BlockDevices []BlockDevice `json:"blockdevices"` +} + +// mapDevicePathsToVolumeIDs takes a list of device paths and lsblk output, and returns a map of device paths to volume IDs. +func mapDevicePathsToVolumeIDs(devicePaths []string, lsblkOutput []byte) (map[string]string, error) { + m := make(map[string]string) + + var lsblkData LsblkOutput + if err := json.Unmarshal(lsblkOutput, &lsblkData); err != nil { + return nil, fmt.Errorf("mapDevicePathsToVolumeIDs: error unmarshaling JSON: %w", err) + } + + for _, device := range lsblkData.BlockDevices { + devicePath := "/dev/" + device.Name + + for _, path := range devicePaths { + if strings.HasPrefix(path, devicePath) { + volumeID := device.Serial + + if strings.HasPrefix(volumeID, "vol") && !strings.HasPrefix(volumeID, "vol-") { + volumeID = "vol-" + volumeID[3:] + } + + m[path] = volumeID + break + } + } + } + + return m, nil +} + +func executeLsblk() ([]byte, error) { + cmd := exec.Command("lsblk", "-nd", "--json", "-o", "NAME,SERIAL") + output, err := cmd.Output() + if err != nil { + return nil, fmt.Errorf("executeLsblk: error running lsblk: %w", err) + } + return output, nil +} + +func fetchDevicePathToVolumeIDMapping(devicePaths []string) (map[string]string, error) { + output, err := executeLsblk() + if err != nil { + return nil, fmt.Errorf("fetchDevicePathToVolumeIDMapping: %w", err) + } + + return mapDevicePathsToVolumeIDs(devicePaths, output) +} diff --git a/pkg/metrics/nvme_test.go b/pkg/metrics/nvme_test.go new file mode 100644 index 0000000000..7ed70d4186 --- /dev/null +++ b/pkg/metrics/nvme_test.go @@ -0,0 +1,330 @@ +// Copyright 2024 The Kubernetes Authors. +// +// Licensed under the Apache License, Version 2.0 (the 'License'); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an 'AS IS' BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build linux +// +build linux + +package metrics + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "reflect" + "strings" + "testing" +) + +func TestNewNVMECollector(t *testing.T) { + testPath := "/test/path" + testInstanceID := "test-instance-1" + + collector := NewNVMECollector(testPath, testInstanceID) + + if collector == nil { + t.Fatal("NewNVMECollector returned nil") + } + + if collector.csiMountPointPath != testPath { + t.Errorf("csiMountPointPath = %v, want %v", collector.csiMountPointPath, testPath) + } + + if collector.instanceID != testInstanceID { + t.Errorf("instanceID = %v, want %v", collector.instanceID, testInstanceID) + } + + expectedMetrics := []string{ + "total_read_ops", + "total_write_ops", + "total_read_bytes", + "total_write_bytes", + "total_read_time", + "total_write_time", + "ebs_volume_performance_exceeded_iops", + "ebs_volume_performance_exceeded_tp", + "ec2_instance_ebs_performance_exceeded_iops", + "ec2_instance_ebs_performance_exceeded_tp", + "volume_queue_length", + "read_io_latency_histogram", + "write_io_latency_histogram", + } + + for _, metricName := range expectedMetrics { + if _, exists := collector.metrics[metricName]; !exists { + t.Errorf("metric %s not found in collector.metrics", metricName) + } + } +} + +func TestConvertHistogram(t *testing.T) { + tests := []struct { + name string + histogram Histogram + wantCount uint64 + wantBuckets map[float64]uint64 + }{ + { + name: "standard histogram with multiple bins", + histogram: Histogram{ + BinCount: 3, + Bins: [64]HistogramBin{ + {Lower: 0, Upper: 100, Count: 5}, + {Lower: 100, Upper: 200, Count: 3}, + {Lower: 200, Upper: 300, Count: 2}, + }, + }, + wantCount: 10, + wantBuckets: map[float64]uint64{ + 100: 5, + 200: 8, + 300: 10, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotCount, gotBuckets := convertHistogram(tt.histogram) + + if gotCount != tt.wantCount { + t.Errorf("convertHistogram() count = %v, want %v", gotCount, tt.wantCount) + } + + if !reflect.DeepEqual(gotBuckets, tt.wantBuckets) { + t.Errorf("convertHistogram() buckets = %v, want %v", gotBuckets, tt.wantBuckets) + } + }) + } +} + +func TestParseLogPage(t *testing.T) { + tests := []struct { + name string + input []byte + want EBSMetrics + wantErr string + }{ + { + name: "valid log page", + input: func() []byte { + metrics := EBSMetrics{ + EBSMagic: 0x3C23B510, + ReadOps: 100, + WriteOps: 200, + ReadBytes: 1024, + WriteBytes: 2048, + TotalReadTime: 5000, + TotalWriteTime: 6000, + EBSIOPSExceeded: 10, + EBSThroughputExceeded: 20, + } + buf := new(bytes.Buffer) + if err := binary.Write(buf, binary.LittleEndian, metrics); err != nil { + t.Fatalf("failed to create test data: %v", err) + } + return buf.Bytes() + }(), + want: EBSMetrics{ + EBSMagic: 0x3C23B510, + ReadOps: 100, + WriteOps: 200, + ReadBytes: 1024, + WriteBytes: 2048, + TotalReadTime: 5000, + TotalWriteTime: 6000, + EBSIOPSExceeded: 10, + EBSThroughputExceeded: 20, + }, + wantErr: "", + }, + { + name: "invalid magic number", + input: func() []byte { + metrics := EBSMetrics{ + EBSMagic: 0x12345678, + } + buf := new(bytes.Buffer) + if err := binary.Write(buf, binary.LittleEndian, metrics); err != nil { + t.Fatalf("failed to create test data: %v", err) + } + return buf.Bytes() + }(), + want: EBSMetrics{}, + wantErr: ErrInvalidEBSMagic.Error(), + }, + { + name: "empty data", + input: []byte{}, + want: EBSMetrics{}, + wantErr: ErrParseLogPage.Error(), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := parseLogPage(tt.input) + + if tt.wantErr != "" { + if err == nil { + t.Errorf("parseLogPage() error = nil, wantErr %v", tt.wantErr) + return + } + if !strings.Contains(err.Error(), tt.wantErr) { + t.Errorf("parseLogPage() error = %v, wantErr %v", err, tt.wantErr) + return + } + return + } + + if err != nil { + t.Errorf("parseLogPage() unexpected error = %v", err) + return + } + + if got.EBSMagic != tt.want.EBSMagic { + t.Errorf("parseLogPage() magic number = %x, want %x", got.EBSMagic, tt.want.EBSMagic) + } + if got.ReadOps != tt.want.ReadOps { + t.Errorf("parseLogPage() ReadOps = %v, want %v", got.ReadOps, tt.want.ReadOps) + } + if got.WriteOps != tt.want.WriteOps { + t.Errorf("parseLogPage() WriteOps = %v, want %v", got.WriteOps, tt.want.WriteOps) + } + if got.ReadBytes != tt.want.ReadBytes { + t.Errorf("parseLogPage() ReadBytes = %v, want %v", got.ReadBytes, tt.want.ReadBytes) + } + if got.WriteBytes != tt.want.WriteBytes { + t.Errorf("parseLogPage() WriteBytes = %v, want %v", got.WriteBytes, tt.want.WriteBytes) + } + }) + } +} + +func TestMapDevicePathsToVolumeIDs(t *testing.T) { + tests := []struct { + name string + devicePaths []string + lsblkOutput []byte + want map[string]string + wantErr bool + }{ + { + name: "standard device mapping", + devicePaths: []string{ + "/dev/nvme1n1", + "/dev/nvme2n1", + }, + lsblkOutput: func() []byte { + data := LsblkOutput{ + BlockDevices: []BlockDevice{ + {Name: "nvme1n1", Serial: "vol-123456789"}, + {Name: "nvme2n1", Serial: "vol-987654321"}, + }, + } + b, err := json.Marshal(data) + if err != nil { + t.Fatalf("failed to create test data: %v", err) + } + return b + }(), + want: map[string]string{ + "/dev/nvme1n1": "vol-123456789", + "/dev/nvme2n1": "vol-987654321", + }, + wantErr: false, + }, + { + name: "device without hyphen", + devicePaths: []string{ + "/dev/nvme1n1", + }, + lsblkOutput: func() []byte { + data := LsblkOutput{ + BlockDevices: []BlockDevice{ + {Name: "nvme1n1", Serial: "vol123456789"}, + }, + } + b, err := json.Marshal(data) + if err != nil { + t.Fatalf("failed to create test data: %v", err) + } + return b + }(), + want: map[string]string{ + "/dev/nvme1n1": "vol-123456789", + }, + wantErr: false, + }, + { + name: "empty device paths", + devicePaths: []string{}, + lsblkOutput: func() []byte { + data := LsblkOutput{ + BlockDevices: []BlockDevice{ + {Name: "nvme1n1", Serial: "vol-123456789"}, + }, + } + b, err := json.Marshal(data) + if err != nil { + t.Fatalf("failed to create test data: %v", err) + } + return b + }(), + want: map[string]string{}, + wantErr: false, + }, + { + name: "invalid json", + devicePaths: []string{"/dev/nvme1n1"}, + lsblkOutput: []byte(`invalid json`), + want: nil, + wantErr: true, + }, + { + name: "no matching devices", + devicePaths: []string{ + "/dev/nvme3n1", + }, + lsblkOutput: func() []byte { + data := LsblkOutput{ + BlockDevices: []BlockDevice{ + {Name: "nvme1n1", Serial: "vol-123456789"}, + }, + } + b, err := json.Marshal(data) + if err != nil { + t.Fatalf("failed to create test data: %v", err) + } + return b + }(), + want: map[string]string{}, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := mapDevicePathsToVolumeIDs(tt.devicePaths, tt.lsblkOutput) + + if (err != nil) != tt.wantErr { + t.Errorf("mapDevicePathsToVolumeIDs() error = %v, wantErr %v", err, tt.wantErr) + return + } + + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("mapDevicePathsToVolumeIDs() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/metrics/nvme_windows.go b/pkg/metrics/nvme_windows.go new file mode 100644 index 0000000000..e920f73ae1 --- /dev/null +++ b/pkg/metrics/nvme_windows.go @@ -0,0 +1,24 @@ +// Copyright 2024 The Kubernetes Authors. +// +// Licensed under the Apache License, Version 2.0 (the 'License'); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an 'AS IS' BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build windows +// +build windows + +package metrics + +import "k8s.io/klog/v2" + +func registerNVMECollector(_ *metricRecorder, _, _ string) { + klog.InfoS("NVMe metric collection is not supported on windows") +} diff --git a/tests/e2e/nvme_metrics.go b/tests/e2e/nvme_metrics.go new file mode 100644 index 0000000000..397c13d890 --- /dev/null +++ b/tests/e2e/nvme_metrics.go @@ -0,0 +1,171 @@ +// Copyright 2024 The Kubernetes Authors. +// +// Licensed under the Apache License, Version 2.0 (the 'License'); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an 'AS IS' BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "context" + "fmt" + "io" + "net/http" + "time" + + awscloud "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud" + ebscsidriver "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/driver" + "github.com/kubernetes-sigs/aws-ebs-csi-driver/tests/e2e/driver" + "github.com/kubernetes-sigs/aws-ebs-csi-driver/tests/e2e/testsuites" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/test/e2e/framework" + "k8s.io/kubernetes/test/e2e/framework/kubectl" + admissionapi "k8s.io/pod-security-admission/api" +) + +var _ = Describe("[ebs-csi-e2e] [single-az] NVMe Metrics", func() { + f := framework.NewDefaultFramework("ebs") + f.NamespacePodSecurityEnforceLevel = admissionapi.LevelPrivileged + + var ( + cs clientset.Interface + ns *v1.Namespace + ebsDriver driver.PVTestDriver + ) + + BeforeEach(func() { + cs = f.ClientSet + ns = f.Namespace + ebsDriver = driver.InitEbsCSIDriver() + }) + + It("should create a stateful pod and read NVMe metrics from node plugin", func() { + pod := testsuites.PodDetails{ + Cmd: "while true; do echo $(date -u) >> /mnt/test-1/out.txt; sleep 5; done", + Volumes: []testsuites.VolumeDetails{ + { + CreateVolumeParameters: map[string]string{ + ebscsidriver.VolumeTypeKey: awscloud.VolumeTypeGP3, + ebscsidriver.FSTypeKey: ebscsidriver.FSTypeExt4, + }, + ClaimSize: driver.MinimumSizeForVolumeType(awscloud.VolumeTypeGP3), + VolumeMount: testsuites.VolumeMountDetails{ + NameGenerate: "test-volume-", + MountPathGenerate: "/mnt/test-", + }, + }, + }, + } + + By("Setting up pod and volumes") + tpod, _ := pod.SetupWithDynamicVolumes(cs, ns, ebsDriver) + defer tpod.Cleanup() + + By("Creating the stateful pod") + tpod.Create() + tpod.WaitForRunning() + + By("Getting name of the node where test pod is running") + p, err := cs.CoreV1().Pods(ns.Name).Get(context.TODO(), tpod.GetName(), metav1.GetOptions{}) + framework.ExpectNoError(err, "Failed to get pod") + nodeName := p.Spec.NodeName + Expect(nodeName).NotTo(BeEmpty(), "Node name should not be empty") + + By("Finding the CSI node plugin on the node where test pod is running") + csiPods, err := cs.CoreV1().Pods("kube-system").List(context.TODO(), metav1.ListOptions{ + LabelSelector: "app=ebs-csi-node", + }) + framework.ExpectNoError(err, "Failed to get CSI node pods") + + var csiNodePod *v1.Pod + for _, pod := range csiPods.Items { + if pod.Spec.NodeName == nodeName { + csiNodePod = &pod + break + } + } + Expect(csiNodePod).NotTo(BeNil(), "No CSI node pod found") + + By("Setting up port-forwarding") + args := []string{"port-forward", "-n", "kube-system", csiNodePod.Name, "3302:3302"} + + go kubectl.RunKubectlOrDie("kube-system", args...) + + By("Getting metrics from endpoint") + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + metricsOutput, err := getMetricsWithRetry(ctx) + framework.ExpectNoError(err, "Failed to get metrics after retries") + + By("Verifying NVMe metrics") + + expectedMetrics := []string{ + "total_read_ops", + "total_write_ops", + "total_read_bytes", + "total_write_bytes", + "total_read_time", + "total_write_time", + "ebs_volume_performance_exceeded_iops", + "ebs_volume_performance_exceeded_tp", + "ec2_instance_ebs_performance_exceeded_iops", + "ec2_instance_ebs_performance_exceeded_tp", + "volume_queue_length", + "read_io_latency_histogram", + "write_io_latency_histogram", + } + + for _, metric := range expectedMetrics { + Expect(metricsOutput).To(ContainSubstring(metric), + fmt.Sprintf("Metric %s not found in response", metric)) + } + }) +}) + +func getMetricsWithRetry(ctx context.Context) (string, error) { + var metricsOutput string + + backoff := wait.Backoff{ + Duration: 5 * time.Second, + Factor: 2.0, + Steps: 5, + } + + err := wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://localhost:3302/metrics", nil) + if err != nil { + return false, fmt.Errorf("failed to create request: %w", err) + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + framework.Logf("Failed to get metrics: %v, retrying...", err) + return false, nil + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + framework.Logf("Failed to read metrics: %v, retrying...", err) + return false, nil + } + + metricsOutput = string(body) + return true, nil + }) + + return metricsOutput, err +} diff --git a/tests/e2e/testsuites/testsuites.go b/tests/e2e/testsuites/testsuites.go index 32ffe0f43b..a99c756197 100644 --- a/tests/e2e/testsuites/testsuites.go +++ b/tests/e2e/testsuites/testsuites.go @@ -654,6 +654,10 @@ func (t *TestPod) Create() { framework.ExpectNoError(err) } +func (t *TestPod) GetName() string { + return t.pod.Name +} + func (t *TestPod) WaitForSuccess() { err := e2epod.WaitForPodSuccessInNamespace(context.Background(), t.client, t.pod.Name, t.namespace.Name) framework.ExpectNoError(err)