Skip to content

Commit

Permalink
feat(metrics): emit metrics for outdated/missing malachite data
Browse files Browse the repository at this point in the history
  • Loading branch information
gary-lgy committed Feb 5, 2025
1 parent fb6d1d8 commit 6eb5137
Show file tree
Hide file tree
Showing 11 changed files with 176 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package client
import (
"fmt"
"sync"
"time"

"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod"
"github.com/kubewharf/katalyst-core/pkg/metrics"
)

const (
Expand All @@ -37,6 +39,19 @@ const (
RealtimePowerResource = "realtime/power"
)

const (
UpdateTimeout = 30 * time.Second
RealtimeUpdateTimeout = 5 * time.Second
)

const (
metricMalachiteSystemStatsOutdated = "malachite_system_stats_outdated"
metricMalachiteContainerStatsOutdated = "malachite_container_stats_outdated"
metricMalachiteCgroupStatsOutdated = "malachite_cgroup_stats_outdated"

metricMalachiteContainerStatsMissing = "malachite_container_stats_missing"
)

type SystemResourceKind int

const (
Expand All @@ -52,10 +67,11 @@ type MalachiteClient struct {
urls map[string]string
relativePathFunc *func(podUID, containerId string) (string, error)

emitter metrics.MetricEmitter
fetcher pod.PodFetcher
}

func NewMalachiteClient(fetcher pod.PodFetcher) *MalachiteClient {
func NewMalachiteClient(fetcher pod.PodFetcher, emitter metrics.MetricEmitter) *MalachiteClient {
urls := make(map[string]string)
for _, path := range []string{
CgroupResource,
Expand All @@ -70,6 +86,7 @@ func NewMalachiteClient(fetcher pod.PodFetcher) *MalachiteClient {

return &MalachiteClient{
fetcher: fetcher,
emitter: emitter,
urls: urls,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import (
"fmt"
"io/ioutil"
"net/http"
"time"

"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/provisioner/malachite/types"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
)

func (c *MalachiteClient) GetCgroupStats(cgroupPath string) (*types.MalachiteCgroupInfo, error) {
Expand Down Expand Up @@ -76,6 +79,7 @@ func (c *MalachiteClient) GetCgroupStats(cgroupPath string) (*types.MalachiteCgr
return nil, fmt.Errorf("unknow cgroup type %s in cgroup info", cgroupInfo.CgroupType)
}

c.checkCgroupStatsOutdated(cgroupPath, cgroupInfo)
return cgroupInfo, nil
}

Expand Down Expand Up @@ -109,3 +113,54 @@ func (c *MalachiteClient) getCgroupStats(cgroupPath string) ([]byte, error) {

return ioutil.ReadAll(rsp.Body)
}

func (c *MalachiteClient) checkCgroupStatsOutdated(cgroupPath string, stats *types.MalachiteCgroupInfo) {
checkAndEmit := func(updateTimestamp int64, statsType string) {
updateTime := time.Unix(updateTimestamp, 0)
if time.Since(updateTime) <= UpdateTimeout {
return
}

general.Warningf(
"malachite cgroup %s stats outdated, cgroup %s, last update time %s",
statsType, cgroupPath, updateTime)
_ = c.emitter.StoreInt64(metricMalachiteCgroupStatsOutdated, 1, metrics.MetricTypeNameCount,
metrics.MetricTag{Key: "type", Val: statsType},
metrics.MetricTag{Key: "cgroupPath", Val: cgroupPath},
)
}

if stats.CgroupType == "V1" && stats.V1 != nil {
if stats.V1.Cpu != nil {
checkAndEmit(stats.V1.Cpu.UpdateTime, "cpu")
}
if stats.V1.Memory != nil {
checkAndEmit(stats.V1.Memory.UpdateTime, "memory")
}
if stats.V1.CpuSet != nil {
checkAndEmit(stats.V1.CpuSet.UpdateTime, "cpuset")
}
if stats.V1.Blkio != nil {
checkAndEmit(stats.V1.Blkio.UpdateTime, "blkio")
}
if stats.V1.NetCls != nil {
checkAndEmit(stats.V1.NetCls.UpdateTime, "netcls")
}
} else if stats.CgroupType == "V2" && stats.V2 != nil {
if stats.V2.Cpu != nil {
checkAndEmit(stats.V2.Cpu.UpdateTime, "cpu")
}
if stats.V2.Memory != nil {
checkAndEmit(stats.V2.Memory.UpdateTime, "memory")
}
if stats.V2.CpuSet != nil {
checkAndEmit(stats.V2.CpuSet.UpdateTime, "cpuset")
}
if stats.V2.Blkio != nil {
checkAndEmit(stats.V2.Blkio.UpdateTime, "blkio")
}
if stats.V2.NetCls != nil {
checkAndEmit(stats.V2.NetCls.UpdateTime, "netcls")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/provisioner/malachite/types"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod"
"github.com/kubewharf/katalyst-core/pkg/metrics"
)

var (
Expand Down Expand Up @@ -132,7 +133,7 @@ func TestGetCgroupStats(t *testing.T) {
}))
defer server.Close()

malachiteClient := NewMalachiteClient(&pod.PodFetcherStub{})
malachiteClient := NewMalachiteClient(&pod.PodFetcherStub{}, metrics.DummyMetrics{})
malachiteClient.SetURL(map[string]string{
CgroupResource: server.URL,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package client
import (
"context"
"fmt"
"time"

v1 "k8s.io/api/core/v1"

"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/provisioner/malachite/types"
"github.com/kubewharf/katalyst-core/pkg/metrics"
cgroupcm "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/native"
Expand Down Expand Up @@ -59,7 +61,11 @@ func (c *MalachiteClient) GetPodStats(ctx context.Context, podUID string) (map[s
containerID := native.TrimContainerIDPrefix(containerStatus.ContainerID)
stats, err := c.GetPodContainerStats(podUID, containerID)
if err != nil {
general.Errorf("GetPodStats err %v", err)
general.Errorf("failed to get pod %s container %s stats, err %v", podUID, containerID, err)
_ = c.emitter.StoreInt64(metricMalachiteContainerStatsMissing, 1, metrics.MetricTypeNameCount,
metrics.MetricTag{Key: "podUID", Val: podUID},
metrics.MetricTag{Key: "containerID", Val: containerID},
)
continue
}
containersStats[containerStatus.Name] = stats
Expand All @@ -85,5 +91,62 @@ func (c *MalachiteClient) GetPodContainerStats(podUID, containerID string) (*typ
if err != nil {
return nil, fmt.Errorf("GetPodContainerStats %s/%v get-status %v err %v", podUID, containerID, cgroupPath, err)
}

c.checkContainerStatsOutdated(podUID, containerID, containersStats)
return containersStats, nil
}

func (c *MalachiteClient) checkContainerStatsOutdated(
podUID, containerID string,
stats *types.MalachiteCgroupInfo,
) {
checkAndEmit := func(updateTimestamp int64, statsType string) {
updateTime := time.Unix(updateTimestamp, 0)
if time.Since(updateTime) <= UpdateTimeout {
return
}

general.Warningf(
"malachite container %s stats outdated, pod %s container %s, last update time %s",
statsType, podUID, containerID, updateTime)
_ = c.emitter.StoreInt64(metricMalachiteContainerStatsOutdated, 1, metrics.MetricTypeNameCount,
metrics.MetricTag{Key: "type", Val: statsType},
metrics.MetricTag{Key: "podUID", Val: podUID},
metrics.MetricTag{Key: "containerID", Val: containerID},
)
}

if stats.CgroupType == "V1" && stats.V1 != nil {
if stats.V1.Cpu != nil {
checkAndEmit(stats.V1.Cpu.UpdateTime, "cpu")
}
if stats.V1.Memory != nil {
checkAndEmit(stats.V1.Memory.UpdateTime, "memory")
}
if stats.V1.CpuSet != nil {
checkAndEmit(stats.V1.CpuSet.UpdateTime, "cpuset")
}
if stats.V1.Blkio != nil {
checkAndEmit(stats.V1.Blkio.UpdateTime, "blkio")
}
if stats.V1.NetCls != nil {
checkAndEmit(stats.V1.NetCls.UpdateTime, "netcls")
}
} else if stats.CgroupType == "V2" && stats.V2 != nil {
if stats.V2.Cpu != nil {
checkAndEmit(stats.V2.Cpu.UpdateTime, "cpu")
}
if stats.V2.Memory != nil {
checkAndEmit(stats.V2.Memory.UpdateTime, "memory")
}
if stats.V2.CpuSet != nil {
checkAndEmit(stats.V2.CpuSet.UpdateTime, "cpuset")
}
if stats.V2.Blkio != nil {
checkAndEmit(stats.V2.Blkio.UpdateTime, "blkio")
}
if stats.V2.NetCls != nil {
checkAndEmit(stats.V2.NetCls.UpdateTime, "netcls")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/provisioner/malachite/types"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/cgroup/common"
)

Expand Down Expand Up @@ -153,7 +154,7 @@ func TestGetPodContainerStats(t *testing.T) {
}
fetcher := &pod.PodFetcherStub{PodList: pods}

malachiteClient := NewMalachiteClient(fetcher)
malachiteClient := NewMalachiteClient(fetcher, metrics.DummyMetrics{})
malachiteClient.SetURL(map[string]string{
CgroupResource: server.URL,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (c *MalachiteClient) GetPowerData() (*types.PowerData, error) {
return nil, fmt.Errorf("system compute stats status is not ok, %d", rsp.Status)
}

c.checkSystemStatsOutdated("power", RealtimeUpdateTimeout, rsp.Data.Sensors.UpdateTime)
return &rsp.Data, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/provisioner/malachite/types"
"github.com/kubewharf/katalyst-core/pkg/metrics"
)

func TestMalachiteClient_GetPower(t *testing.T) {
Expand Down Expand Up @@ -103,7 +104,8 @@ func TestMalachiteClient_GetPower(t *testing.T) {
defer s.Close()

c := &MalachiteClient{
urls: map[string]string{"realtime/power": s.URL},
emitter: metrics.DummyMetrics{},
urls: map[string]string{"realtime/power": s.URL},
}
got, err := c.GetPowerData()
if !tt.wantErr(t, err, fmt.Sprintf("GetPowerData()")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import (
"fmt"
"io/ioutil"
"net/http"
"time"

"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/provisioner/malachite/types"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
)

func (c *MalachiteClient) GetSystemComputeStats() (*types.SystemComputeData, error) {
Expand All @@ -40,6 +43,7 @@ func (c *MalachiteClient) GetSystemComputeStats() (*types.SystemComputeData, err
return nil, fmt.Errorf("system compute stats status is not ok, %d", rsp.Status)
}

c.checkSystemStatsOutdated("compute", UpdateTimeout, rsp.Data.UpdateTime)
return &rsp.Data, nil
}

Expand All @@ -58,6 +62,7 @@ func (c *MalachiteClient) GetSystemMemoryStats() (*types.SystemMemoryData, error
return nil, fmt.Errorf("system memory stats status is not ok, %d", rsp.Status)
}

c.checkSystemStatsOutdated("memory", UpdateTimeout, rsp.Data.UpdateTime)
return &rsp.Data, nil
}

Expand All @@ -76,6 +81,7 @@ func (c *MalachiteClient) GetSystemIOStats() (*types.SystemDiskIoData, error) {
return nil, fmt.Errorf("system io stats status is not ok, %d", rsp.Status)
}

c.checkSystemStatsOutdated("io", UpdateTimeout, rsp.Data.UpdateTime)
return &rsp.Data, nil
}

Expand All @@ -94,6 +100,7 @@ func (c *MalachiteClient) GetSystemNetStats() (*types.SystemNetworkData, error)
return nil, fmt.Errorf("system network stats status is not ok, %d", rsp.Status)
}

c.checkSystemStatsOutdated("network", UpdateTimeout, rsp.Data.UpdateTime)
return &rsp.Data, nil
}

Expand Down Expand Up @@ -138,3 +145,18 @@ func (c *MalachiteClient) getSystemStats(kind SystemResourceKind) ([]byte, error

return ioutil.ReadAll(rsp.Body)
}

func (c *MalachiteClient) checkSystemStatsOutdated(statsType string, timeout time.Duration, updateTimestamp int64) {
updateTime := time.Unix(updateTimestamp, 0)
if time.Since(updateTime) <= timeout {
return
}

general.Warningf(
"malachite system %s stats outdated, last update time %s",
statsType, updateTime)
_ = c.emitter.StoreInt64(metricMalachiteSystemStatsOutdated, 1, metrics.MetricTypeNameCount, metrics.MetricTag{
Key: "type",
Val: statsType,
})
}
Loading

0 comments on commit 6eb5137

Please sign in to comment.