Skip to content

Commit

Permalink
Merge pull request #768 from xu282934741/add-message-tag-in-allocate_…
Browse files Browse the repository at this point in the history
…failed_metric

 feat(qrm): add message tag in qrm plugins alloc_failed,get_topology_hints_failed and remove_pod_failed metric
  • Loading branch information
xu282934741 authored Jan 20, 2025
2 parents 0c24ce6 + f609b01 commit 74b8399
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 11 deletions.
4 changes: 2 additions & 2 deletions cmd/base/healthz.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ package katalyst_base
import (
"context"
"encoding/json"
"strings"
"time"

"go.uber.org/atomic"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/metric"
)

const (
Expand Down Expand Up @@ -55,7 +55,7 @@ func (h *HealthzChecker) Run(ctx context.Context) {
if !result.Ready {
_ = h.emitter.StoreInt64(MetricNameUnhealthyRule, 1, metrics.MetricTypeNameRaw,
metrics.MetricTag{Key: "rule", Val: string(key)},
metrics.MetricTag{Key: "error_message", Val: strings.ReplaceAll(result.Message, " ", "_")})
metrics.MetricTag{Key: "error_message", Val: metric.MetricTagValueFormat(result.Message)})
general.Warningf("%v healthz rule is not ready,error message is %v", string(key), result.Message)
}
}
Expand Down
11 changes: 8 additions & 3 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/util/cgroup/common"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
"github.com/kubewharf/katalyst-core/pkg/util/metric"
"github.com/kubewharf/katalyst-core/pkg/util/native"
"github.com/kubewharf/katalyst-core/pkg/util/process"
"github.com/kubewharf/katalyst-core/pkg/util/timemonitor"
Expand Down Expand Up @@ -694,7 +695,9 @@ func (p *DynamicPolicy) GetTopologyHints(ctx context.Context,
defer func() {
p.RUnlock()
if err != nil {
_ = p.emitter.StoreInt64(util.MetricNameGetTopologyHintsFailed, 1, metrics.MetricTypeNameRaw)
_ = p.emitter.StoreInt64(util.MetricNameGetTopologyHintsFailed, 1, metrics.MetricTypeNameRaw,
metrics.MetricTag{Key: "error_message", Val: metric.MetricTagValueFormat(err)})

general.ErrorS(err, "GetTopologyHints failed",
"podNamespace", req.PodNamespace,
"podName", req.PodName,
Expand Down Expand Up @@ -840,7 +843,8 @@ func (p *DynamicPolicy) Allocate(ctx context.Context,
}
} else if respErr != nil {
_ = p.removeContainer(req.PodUid, req.ContainerName)
_ = p.emitter.StoreInt64(util.MetricNameAllocateFailed, 1, metrics.MetricTypeNameRaw)
_ = p.emitter.StoreInt64(util.MetricNameAllocateFailed, 1, metrics.MetricTypeNameRaw,
metrics.MetricTag{Key: "error_message", Val: metric.MetricTagValueFormat(respErr)})
}

p.Unlock()
Expand Down Expand Up @@ -933,7 +937,8 @@ func (p *DynamicPolicy) RemovePod(ctx context.Context,
defer func() {
p.Unlock()
if err != nil {
_ = p.emitter.StoreInt64(util.MetricNameRemovePodFailed, 1, metrics.MetricTypeNameRaw)
_ = p.emitter.StoreInt64(util.MetricNameRemovePodFailed, 1, metrics.MetricTypeNameRaw,
metrics.MetricTag{Key: "error_message", Val: metric.MetricTagValueFormat(err)})
general.ErrorS(err, "RemovePod failed", "podUID", req.PodUid)
}
general.InfoS("finished", "duration", time.Since(startTime).String(), "podUID", req.PodUid)
Expand Down
13 changes: 9 additions & 4 deletions pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/util/asyncworker"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
"github.com/kubewharf/katalyst-core/pkg/util/metric"
"github.com/kubewharf/katalyst-core/pkg/util/native"
"github.com/kubewharf/katalyst-core/pkg/util/process"
"github.com/kubewharf/katalyst-core/pkg/util/timemonitor"
Expand Down Expand Up @@ -584,7 +585,8 @@ func (p *DynamicPolicy) GetTopologyHints(ctx context.Context,
defer func() {
p.RUnlock()
if err != nil {
_ = p.emitter.StoreInt64(util.MetricNameGetTopologyHintsFailed, 1, metrics.MetricTypeNameRaw)
_ = p.emitter.StoreInt64(util.MetricNameGetTopologyHintsFailed, 1, metrics.MetricTypeNameRaw,
metrics.MetricTag{Key: "error_message", Val: metric.MetricTagValueFormat(err)})
general.ErrorS(err, "GetTopologyHints failed",
"podNamespace", req.PodNamespace,
"podName", req.PodName,
Expand Down Expand Up @@ -626,7 +628,8 @@ func (p *DynamicPolicy) RemovePod(ctx context.Context,
defer func() {
p.Unlock()
if err != nil {
_ = p.emitter.StoreInt64(util.MetricNameRemovePodFailed, 1, metrics.MetricTypeNameRaw)
_ = p.emitter.StoreInt64(util.MetricNameRemovePodFailed, 1, metrics.MetricTypeNameRaw,
metrics.MetricTag{Key: "error_message", Val: metric.MetricTagValueFormat(err)})
general.ErrorS(err, "RemovePod failed", "podUID", req.PodUid)
}
general.InfoS("finished", "duration", time.Since(startTime), "podUID", req.PodUid)
Expand All @@ -652,7 +655,8 @@ func (p *DynamicPolicy) RemovePod(ctx context.Context,
err = p.removePod(req.PodUid)
if err != nil {
general.ErrorS(err, "remove pod failed with error", "podUID", req.PodUid)
_ = p.emitter.StoreInt64(util.MetricNameRemovePodFailed, 1, metrics.MetricTypeNameRaw)
_ = p.emitter.StoreInt64(util.MetricNameRemovePodFailed, 1, metrics.MetricTypeNameRaw,
metrics.MetricTag{Key: "error_message", Val: metric.MetricTagValueFormat(err)})
return nil, err
}

Expand Down Expand Up @@ -944,7 +948,8 @@ func (p *DynamicPolicy) Allocate(ctx context.Context,
}
} else if respErr != nil {
_ = p.removeContainer(req.PodUid, req.ContainerName)
_ = p.emitter.StoreInt64(util.MetricNameAllocateFailed, 1, metrics.MetricTypeNameRaw)
_ = p.emitter.StoreInt64(util.MetricNameAllocateFailed, 1, metrics.MetricTypeNameRaw,
metrics.MetricTag{Key: "error_message", Val: metric.MetricTagValueFormat(respErr)})
}

p.Unlock()
Expand Down
7 changes: 5 additions & 2 deletions pkg/agent/qrm-plugins/network/staticpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
cgroupcmutils "github.com/kubewharf/katalyst-core/pkg/util/cgroup/manager"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
"github.com/kubewharf/katalyst-core/pkg/util/metric"
"github.com/kubewharf/katalyst-core/pkg/util/native"
"github.com/kubewharf/katalyst-core/pkg/util/qos"
)
Expand Down Expand Up @@ -290,7 +291,8 @@ func (p *StaticPolicy) GetTopologyHints(_ context.Context,
defer func() {
p.Unlock()
if err != nil {
_ = p.emitter.StoreInt64(util.MetricNameGetTopologyHintsFailed, 1, metrics.MetricTypeNameRaw)
_ = p.emitter.StoreInt64(util.MetricNameGetTopologyHintsFailed, 1, metrics.MetricTypeNameRaw,
metrics.MetricTag{Key: "error_message", Val: metric.MetricTagValueFormat(err)})
}
}()

Expand Down Expand Up @@ -531,7 +533,8 @@ func (p *StaticPolicy) Allocate(_ context.Context,
defer func() {
p.Unlock()
if err != nil {
_ = p.emitter.StoreInt64(util.MetricNameAllocateFailed, 1, metrics.MetricTypeNameRaw)
_ = p.emitter.StoreInt64(util.MetricNameAllocateFailed, 1, metrics.MetricTypeNameRaw,
metrics.MetricTag{Key: "error_message", Val: metric.MetricTagValueFormat(err)})
}
}()

Expand Down
9 changes: 9 additions & 0 deletions pkg/util/general/string.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,12 @@ func StructToString(val interface{}) string {
func BytesToString(b []byte) string {
return string(b)
}

// TruncateString truncates the string to the first n characters
func TruncateString(s string, n int) string {
runeSlice := []rune(s)
if len(runeSlice) > n {
return string(runeSlice[:n])
}
return s
}
9 changes: 9 additions & 0 deletions pkg/util/metric/store_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package metric

import (
"fmt"
"strings"
"time"

v1 "k8s.io/api/core/v1"
Expand All @@ -32,6 +34,8 @@ type Aggregator string
const (
AggregatorSum Aggregator = "sum"
AggregatorAvg Aggregator = "avg"

MaxTagLength = 255
)

// ContainerMetricFilter is used to filter out unnecessary metrics if this function returns false
Expand Down Expand Up @@ -141,3 +145,8 @@ func (c *MetricStore) AggregateCoreMetric(cpuset machine.CPUSet, metricName stri
}
return data
}

// MetricTagValueFormat formats the given tag value to a string that is suitable for metric tagging
func MetricTagValueFormat(tagValue interface{}) string {
return general.TruncateString(strings.ReplaceAll(fmt.Sprintf("%v", tagValue), " ", "_"), MaxTagLength)
}

0 comments on commit 74b8399

Please sign in to comment.