Skip to content

Commit

Permalink
Add drive I/O metrics for Prometheus
Browse files Browse the repository at this point in the history
Below metrics are exported:
* directpv_stats_drive_ready
* directpv_stats_drive_total_bytes_read
* directpv_stats_drive_total_bytes_written
* directpv_stats_drive_read_latency_seconds
* directpv_stats_drive_write_latency_seconds
* directpv_stats_drive_read_throughput_bytes_per_second
* directpv_stats_drive_write_throughput_bytes_per_second
* directpv_stats_drive_wait_time_seconds

Signed-off-by: Bala.FA <[email protected]>
  • Loading branch information
balamurugana committed Oct 3, 2024
1 parent 557e925 commit 232229b
Show file tree
Hide file tree
Showing 3 changed files with 372 additions and 5 deletions.
48 changes: 48 additions & 0 deletions pkg/device/sysfs_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ package device

import (
"bufio"
"errors"
"fmt"
"os"
"path"
"strconv"
"strings"
)
Expand Down Expand Up @@ -106,3 +108,49 @@ func getHolders(name string) ([]string, error) {
func getDMName(name string) (string, error) {
return readFirstLine("/sys/class/block/" + name + "/dm/name")
}

// GetStat returns statistics for a given device name.
func GetStat(name string) (stats []uint64, err error) {
line, err := readFirstLine("/sys/class/block/" + name + "/stat")
if err != nil {
return nil, err
}

for _, token := range strings.Split(line, " ") {
token = strings.TrimSpace(token)
ui64, err := strconv.ParseUint(token, 10, 64)
if err != nil {
return nil, err
}
stats = append(stats, ui64)
}

return stats, nil
}

// GetHardwareSectorSize returns hardware sector size of associated drive.
func GetHardwareSectorSize(name string) (uint64, error) {
if _, err := os.Lstat("/sys/block/" + name); err != nil {
if !errors.Is(err, os.ErrNotExist) {
return 0, err
}

partPath := "/sys/class/block/" + name
if _, err = os.Stat(partPath + "/partition"); err != nil {
return 0, err
}

linkPath, err := os.Readlink(partPath)
if err != nil {
return 0, err
}

name = path.Base(path.Dir(linkPath))
}

s, err := readFirstLine("/sys/block/" + name + "/queue/hw_sector_size")
if err != nil || s == "" {
return 0, err
}
return strconv.ParseUint(s, 10, 64)
}
163 changes: 160 additions & 3 deletions pkg/metrics/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,57 @@ package metrics

import (
"context"
"fmt"

directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
"github.com/minio/directpv/pkg/client"
"github.com/minio/directpv/pkg/consts"
"github.com/minio/directpv/pkg/device"
"github.com/minio/directpv/pkg/sys"
"github.com/minio/directpv/pkg/types"
"github.com/minio/directpv/pkg/utils"
"github.com/minio/directpv/pkg/xfs"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/klog/v2"
)

type driveStats struct {
readSectorBytes float64
readTicks float64
writeSectorBytes float64
writeTicks float64
timeInQueue float64
}

func getDriveStats(driveName string) (*driveStats, error) {
stat, err := device.GetStat(driveName)
switch {
case err != nil:
return nil, err
case len(stat) == 0:
return nil, fmt.Errorf("unable to read stat from drive %v", driveName)
case len(stat) < 10:
return nil, fmt.Errorf("invalid stat format from drive %v", driveName)
}

hardwareSectorSize, err := device.GetHardwareSectorSize(driveName)
switch {
case err != nil:
return nil, err
case hardwareSectorSize == 0:
hardwareSectorSize = 512 // Use default value
}

// Refer https://www.kernel.org/doc/Documentation/block/stat.txt for meaning of each field.
return &driveStats{
readSectorBytes: float64(stat[2] * hardwareSectorSize),
readTicks: float64(stat[3]),
writeSectorBytes: float64(stat[6] * hardwareSectorSize),
writeTicks: float64(stat[7]),
timeInQueue: float64(stat[9]),
}, nil
}

type metricsCollector struct {
nodeID directpvtypes.NodeID
desc *prometheus.Desc
Expand Down Expand Up @@ -95,21 +135,138 @@ func (c *metricsCollector) publishVolumeStats(ctx context.Context, volume *types
)
}

func (c *metricsCollector) publishDriveStats(drive *types.Drive, ch chan<- prometheus.Metric) {
deviceID, err := c.getDeviceByFSUUID(drive.Status.FSUUID)
if err != nil {
klog.ErrorS(
err,
"unable to find device by FSUUID; "+
"either device is removed or run command "+
"`sudo udevadm control --reload-rules && sudo udevadm trigger`"+
" on the host to reload",
"FSUUID", drive.Status.FSUUID)
client.Eventf(
drive, client.EventTypeWarning, client.EventReasonMetrics,
"unable to find device by FSUUID %v; "+
"either device is removed or run command "+
"`sudo udevadm control --reload-rules && sudo udevadm trigger`"+
" on the host to reload", drive.Status.FSUUID)

return
}
deviceName := utils.TrimDevPrefix(deviceID)

status := float64(1) // Online
driveStat, err := getDriveStats(deviceName)
if err != nil {
klog.ErrorS(err, "unable to read drive statistics")
status = float64(0) // Offline
}

// Metrics
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(consts.AppName, "stats", "drive_ready"),
"Drive Online/Offline Status",
[]string{"drive"}, nil),
prometheus.GaugeValue,
status, drive.Name,
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(consts.AppName, "stats", "drive_total_bytes_read"),
"Total number of bytes read from the drive",
[]string{"drive"}, nil),
prometheus.GaugeValue,
driveStat.readSectorBytes, drive.Name,
)

ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(consts.AppName, "stats", "drive_total_bytes_written"),
"Total number of bytes written to the drive",
[]string{"drive"}, nil),
prometheus.GaugeValue,
driveStat.writeSectorBytes, drive.Name,
)

// Drive Read/Write Latency
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(consts.AppName, "stats", "drive_read_latency_seconds"),
"Drive Read Latency",
[]string{"drive"}, nil),
prometheus.GaugeValue,
driveStat.readTicks/1000, drive.Name,
)

ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(consts.AppName, "stats", "drive_write_latency_seconds"),
"Drive Write Latency",
[]string{"drive"}, nil),
prometheus.GaugeValue,
driveStat.writeTicks/1000, drive.Name,
)

// Drive Read/Write Throughput
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(consts.AppName, "stats", "drive_read_throughput_bytes_per_second"),
"Drive Read Throughput",
[]string{"drive"}, nil),
prometheus.GaugeValue,
1000*driveStat.readSectorBytes/driveStat.readTicks, drive.Name,
)

ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(consts.AppName, "stats", "drive_write_throughput_bytes_per_second"),
"Drive Write Throughput",
[]string{"drive"}, nil),
prometheus.GaugeValue,
1000*driveStat.writeSectorBytes/driveStat.writeTicks, drive.Name,
)

// Wait Time
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(consts.AppName, "stats", "drive_wait_time_seconds"),
"Drive Wait Time",
[]string{"drive"}, nil),
prometheus.GaugeValue,
driveStat.timeInQueue/1000, drive.Name,
)
}

// Collect is called by Prometheus registry when collecting metrics.
func (c *metricsCollector) Collect(ch chan<- prometheus.Metric) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()

resultCh := client.NewVolumeLister().
// Collecting volume statistics
volumeResultCh := client.NewVolumeLister().
NodeSelector([]directpvtypes.LabelValue{directpvtypes.ToLabelValue(string(c.nodeID))}).
List(ctx)
for result := range resultCh {
for result := range volumeResultCh {
if result.Err != nil {
return
break
}

if result.Volume.Status.TargetPath != "" {
c.publishVolumeStats(ctx, &result.Volume, ch)
}
}

// Collecting drive statistics
driveResultCh := client.NewDriveLister().
NodeSelector([]directpvtypes.LabelValue{directpvtypes.ToLabelValue(string(c.nodeID))}).
List(ctx)
for result := range driveResultCh {
if result.Err != nil {
break
}

c.publishDriveStats(&result.Drive, ch)
}
}
Loading

0 comments on commit 232229b

Please sign in to comment.