Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): emit metrics for out-of-date/missing malachite data #773

Merged
merged 1 commit into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package client
import (
"fmt"
"sync"
"time"

"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod"
"github.com/kubewharf/katalyst-core/pkg/metrics"
)

const (
Expand All @@ -37,6 +39,18 @@ const (
RealtimePowerResource = "realtime/power"
)

const (
UpdateTimeout = 30 * time.Second
RealtimeUpdateTimeout = 5 * time.Second
)

const (
metricMalachiteSystemStatsOutOfDate = "malachite_system_stats_out_of_date"
metricMalachiteCgroupStatsOutOfDate = "malachite_cgroup_stats_out_of_date"

metricMalachiteContainerStatsMissing = "malachite_container_stats_missing"
)

type SystemResourceKind int

const (
Expand All @@ -52,10 +66,11 @@ type MalachiteClient struct {
urls map[string]string
relativePathFunc *func(podUID, containerId string) (string, error)

emitter metrics.MetricEmitter
fetcher pod.PodFetcher
}

func NewMalachiteClient(fetcher pod.PodFetcher) *MalachiteClient {
func NewMalachiteClient(fetcher pod.PodFetcher, emitter metrics.MetricEmitter) *MalachiteClient {
urls := make(map[string]string)
for _, path := range []string{
CgroupResource,
Expand All @@ -70,6 +85,7 @@ func NewMalachiteClient(fetcher pod.PodFetcher) *MalachiteClient {

return &MalachiteClient{
fetcher: fetcher,
emitter: emitter,
urls: urls,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import (
"fmt"
"io/ioutil"
"net/http"
"time"

"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/provisioner/malachite/types"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
)

func (c *MalachiteClient) GetCgroupStats(cgroupPath string) (*types.MalachiteCgroupInfo, error) {
Expand Down Expand Up @@ -76,6 +79,7 @@ func (c *MalachiteClient) GetCgroupStats(cgroupPath string) (*types.MalachiteCgr
return nil, fmt.Errorf("unknow cgroup type %s in cgroup info", cgroupInfo.CgroupType)
}

c.checkCgroupStatsOutOfDate(cgroupPath, cgroupInfo)
return cgroupInfo, nil
}

Expand Down Expand Up @@ -109,3 +113,54 @@ func (c *MalachiteClient) getCgroupStats(cgroupPath string) ([]byte, error) {

return ioutil.ReadAll(rsp.Body)
}

func (c *MalachiteClient) checkCgroupStatsOutOfDate(cgroupPath string, stats *types.MalachiteCgroupInfo) {
checkAndEmit := func(updateTimestamp int64, statsType string) {
updateTime := time.Unix(updateTimestamp, 0)
if time.Since(updateTime) <= UpdateTimeout {
return
}

general.Warningf(
"malachite cgroup %s stats outdated, cgroup %s, last update time %s",
statsType, cgroupPath, updateTime)
_ = c.emitter.StoreInt64(metricMalachiteCgroupStatsOutOfDate, 1, metrics.MetricTypeNameCount,
metrics.MetricTag{Key: "type", Val: statsType},
metrics.MetricTag{Key: "cgroupPath", Val: cgroupPath},
)
}

if stats.CgroupType == "V1" && stats.V1 != nil {
if stats.V1.Cpu != nil {
checkAndEmit(stats.V1.Cpu.UpdateTime, "cpu")
}
if stats.V1.Memory != nil {
checkAndEmit(stats.V1.Memory.UpdateTime, "memory")
}
if stats.V1.CpuSet != nil {
checkAndEmit(stats.V1.CpuSet.UpdateTime, "cpuset")
}
if stats.V1.Blkio != nil {
checkAndEmit(stats.V1.Blkio.UpdateTime, "blkio")
}
if stats.V1.NetCls != nil {
checkAndEmit(stats.V1.NetCls.UpdateTime, "netcls")
}
} else if stats.CgroupType == "V2" && stats.V2 != nil {
if stats.V2.Cpu != nil {
checkAndEmit(stats.V2.Cpu.UpdateTime, "cpu")
}
if stats.V2.Memory != nil {
checkAndEmit(stats.V2.Memory.UpdateTime, "memory")
}
if stats.V2.CpuSet != nil {
checkAndEmit(stats.V2.CpuSet.UpdateTime, "cpuset")
}
if stats.V2.Blkio != nil {
checkAndEmit(stats.V2.Blkio.UpdateTime, "blkio")
}
if stats.V2.NetCls != nil {
checkAndEmit(stats.V2.NetCls.UpdateTime, "netcls")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/provisioner/malachite/types"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod"
"github.com/kubewharf/katalyst-core/pkg/metrics"
)

var (
Expand Down Expand Up @@ -132,7 +133,7 @@ func TestGetCgroupStats(t *testing.T) {
}))
defer server.Close()

malachiteClient := NewMalachiteClient(&pod.PodFetcherStub{})
malachiteClient := NewMalachiteClient(&pod.PodFetcherStub{}, metrics.DummyMetrics{})
malachiteClient.SetURL(map[string]string{
CgroupResource: server.URL,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
v1 "k8s.io/api/core/v1"

"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/provisioner/malachite/types"
"github.com/kubewharf/katalyst-core/pkg/metrics"
cgroupcm "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/native"
Expand Down Expand Up @@ -59,7 +60,11 @@ func (c *MalachiteClient) GetPodStats(ctx context.Context, podUID string) (map[s
containerID := native.TrimContainerIDPrefix(containerStatus.ContainerID)
stats, err := c.GetPodContainerStats(podUID, containerID)
if err != nil {
general.Errorf("GetPodStats err %v", err)
general.Errorf("failed to get pod %s container %s stats, err %v", podUID, containerID, err)
_ = c.emitter.StoreInt64(metricMalachiteContainerStatsMissing, 1, metrics.MetricTypeNameCount,
metrics.MetricTag{Key: "podUID", Val: podUID},
metrics.MetricTag{Key: "containerID", Val: containerID},
)
continue
}
containersStats[containerStatus.Name] = stats
Expand All @@ -85,5 +90,6 @@ func (c *MalachiteClient) GetPodContainerStats(podUID, containerID string) (*typ
if err != nil {
return nil, fmt.Errorf("GetPodContainerStats %s/%v get-status %v err %v", podUID, containerID, cgroupPath, err)
}

return containersStats, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/provisioner/malachite/types"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/cgroup/common"
)

Expand Down Expand Up @@ -153,7 +154,7 @@ func TestGetPodContainerStats(t *testing.T) {
}
fetcher := &pod.PodFetcherStub{PodList: pods}

malachiteClient := NewMalachiteClient(fetcher)
malachiteClient := NewMalachiteClient(fetcher, metrics.DummyMetrics{})
malachiteClient.SetURL(map[string]string{
CgroupResource: server.URL,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (c *MalachiteClient) GetPowerData() (*types.PowerData, error) {
return nil, fmt.Errorf("system compute stats status is not ok, %d", rsp.Status)
}

c.checkSystemStatsOutOfDate("power", RealtimeUpdateTimeout, rsp.Data.Sensors.UpdateTime)
return &rsp.Data, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/provisioner/malachite/types"
"github.com/kubewharf/katalyst-core/pkg/metrics"
)

func TestMalachiteClient_GetPower(t *testing.T) {
Expand Down Expand Up @@ -103,7 +104,8 @@ func TestMalachiteClient_GetPower(t *testing.T) {
defer s.Close()

c := &MalachiteClient{
urls: map[string]string{"realtime/power": s.URL},
emitter: metrics.DummyMetrics{},
urls: map[string]string{"realtime/power": s.URL},
}
got, err := c.GetPowerData()
if !tt.wantErr(t, err, fmt.Sprintf("GetPowerData()")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import (
"fmt"
"io/ioutil"
"net/http"
"time"

"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/provisioner/malachite/types"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
)

func (c *MalachiteClient) GetSystemComputeStats() (*types.SystemComputeData, error) {
Expand All @@ -40,6 +43,7 @@ func (c *MalachiteClient) GetSystemComputeStats() (*types.SystemComputeData, err
return nil, fmt.Errorf("system compute stats status is not ok, %d", rsp.Status)
}

c.checkSystemStatsOutOfDate("compute", UpdateTimeout, rsp.Data.UpdateTime)
return &rsp.Data, nil
}

Expand All @@ -58,6 +62,7 @@ func (c *MalachiteClient) GetSystemMemoryStats() (*types.SystemMemoryData, error
return nil, fmt.Errorf("system memory stats status is not ok, %d", rsp.Status)
}

c.checkSystemStatsOutOfDate("memory", UpdateTimeout, rsp.Data.UpdateTime)
return &rsp.Data, nil
}

Expand All @@ -76,6 +81,7 @@ func (c *MalachiteClient) GetSystemIOStats() (*types.SystemDiskIoData, error) {
return nil, fmt.Errorf("system io stats status is not ok, %d", rsp.Status)
}

c.checkSystemStatsOutOfDate("io", UpdateTimeout, rsp.Data.UpdateTime)
return &rsp.Data, nil
}

Expand All @@ -94,6 +100,7 @@ func (c *MalachiteClient) GetSystemNetStats() (*types.SystemNetworkData, error)
return nil, fmt.Errorf("system network stats status is not ok, %d", rsp.Status)
}

c.checkSystemStatsOutOfDate("network", UpdateTimeout, rsp.Data.UpdateTime)
return &rsp.Data, nil
}

Expand Down Expand Up @@ -138,3 +145,18 @@ func (c *MalachiteClient) getSystemStats(kind SystemResourceKind) ([]byte, error

return ioutil.ReadAll(rsp.Body)
}

func (c *MalachiteClient) checkSystemStatsOutOfDate(statsType string, timeout time.Duration, updateTimestamp int64) {
updateTime := time.Unix(updateTimestamp, 0)
if time.Since(updateTime) <= timeout {
return
}

general.Warningf(
"malachite system %s stats outdated, last update time %s",
statsType, updateTime)
_ = c.emitter.StoreInt64(metricMalachiteSystemStatsOutOfDate, 1, metrics.MetricTypeNameCount, metrics.MetricTag{
Key: "type",
Val: statsType,
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/provisioner/malachite/types"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod"
"github.com/kubewharf/katalyst-core/pkg/metrics"
)

var (
Expand Down Expand Up @@ -89,7 +90,7 @@ func TestGetSystemComputeStats(t *testing.T) {
server := getSystemTestServer(data)
defer server.Close()

malachiteClient := NewMalachiteClient(&pod.PodFetcherStub{})
malachiteClient := NewMalachiteClient(&pod.PodFetcherStub{}, metrics.DummyMetrics{})
malachiteClient.SetURL(map[string]string{
SystemComputeResource: server.URL,
})
Expand All @@ -108,7 +109,7 @@ func TestGetSystemCPUCodeName(t *testing.T) {
data, _ := json.Marshal(fakeSystemCompute)
server := getSystemTestServer(data)
defer server.Close()
malachiteClient := NewMalachiteClient(&pod.PodFetcherStub{})
malachiteClient := NewMalachiteClient(&pod.PodFetcherStub{}, metrics.DummyMetrics{})
malachiteClient.SetURL(map[string]string{
SystemComputeResource: server.URL,
})
Expand All @@ -129,7 +130,7 @@ func TestGetSystemMemoryStats(t *testing.T) {
server := getSystemTestServer(data)
defer server.Close()

malachiteClient := NewMalachiteClient(&pod.PodFetcherStub{})
malachiteClient := NewMalachiteClient(&pod.PodFetcherStub{}, metrics.DummyMetrics{})
malachiteClient.SetURL(map[string]string{
SystemMemoryResource: server.URL,
})
Expand All @@ -150,7 +151,7 @@ func TestGetSystemIOStats(t *testing.T) {
server := getSystemTestServer(data)
defer server.Close()

malachiteClient := NewMalachiteClient(&pod.PodFetcherStub{})
malachiteClient := NewMalachiteClient(&pod.PodFetcherStub{}, metrics.DummyMetrics{})
malachiteClient.SetURL(map[string]string{
SystemIOResource: server.URL,
})
Expand All @@ -171,7 +172,7 @@ func TestGetSystemNetStats(t *testing.T) {
server := getSystemTestServer(data)
defer server.Close()

malachiteClient := NewMalachiteClient(&pod.PodFetcherStub{})
malachiteClient := NewMalachiteClient(&pod.PodFetcherStub{}, metrics.DummyMetrics{})
malachiteClient.SetURL(map[string]string{
SystemNetResource: server.URL,
})
Expand All @@ -191,7 +192,7 @@ func TestGetSystemNonExistStats(t *testing.T) {
server := getSystemTestServer([]byte{})
defer server.Close()

malachiteClient := NewMalachiteClient(&pod.PodFetcherStub{})
malachiteClient := NewMalachiteClient(&pod.PodFetcherStub{}, metrics.DummyMetrics{})
_, err := malachiteClient.getSystemStats(100)
assert.ErrorContains(t, err, "unknown")
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func NewMalachiteMetricsProvisioner(baseConf *global.BaseConfiguration, _ *metas
emitter metrics.MetricEmitter, fetcher pod.PodFetcher, metricStore *utilmetric.MetricStore, machineInfo *machine.KatalystMachineInfo,
) types.MetricsProvisioner {
return &MalachiteMetricsProvisioner{
malachiteClient: client.NewMalachiteClient(fetcher),
malachiteClient: client.NewMalachiteClient(fetcher, emitter),
metricStore: metricStore,
emitter: emitter,
baseConf: baseConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewMalachiteRealtimeMetricsProvisioner(baseConf *global.BaseConfiguration,
emitter metrics.MetricEmitter, fetcher pod.PodFetcher, metricStore *utilmetric.MetricStore, _ *machine.KatalystMachineInfo,
) types.MetricsProvisioner {
inner := &MalachiteMetricsProvisioner{
malachiteClient: client.NewMalachiteClient(fetcher),
malachiteClient: client.NewMalachiteClient(fetcher, emitter),
metricStore: metricStore,
emitter: emitter,
baseConf: baseConf,
Expand Down