Skip to content

Commit

Permalink
Merge pull request #2077 from songrx1997/label-propagation-metrics
Browse files Browse the repository at this point in the history
Add label propagation metrics calculation logic
  • Loading branch information
k8s-ci-robot committed Apr 17, 2023
2 parents 7bab346 + 2a56103 commit 6080857
Show file tree
Hide file tree
Showing 10 changed files with 342 additions and 55 deletions.
1 change: 1 addition & 0 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
wait.Until(c.gc, c.gcPeriod, stopCh)
}()
go c.reflector.Run(stopCh)
go c.syncerMetrics.Run(stopCh)
<-stopCh
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/neg/metrics/label_propagation_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const (
annotationSize = "annotation_size_per_endpoint"
labelErrorNumber = "label_propagation_error_count"
numberOfEndpoints = "number_of_endpoints"
epWithAnnotation = "with_annotation"
totalEndpoints = "total"
)

var (
Expand All @@ -33,7 +35,7 @@ var (
}

endpointAnnotationLabels = []string{
"with_annotation",
"feature",
}

NumberOfEndpoints = prometheus.NewGaugeVec(
Expand Down
4 changes: 4 additions & 0 deletions pkg/neg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ func RegisterMetrics() {
prometheus.MustRegister(InitializationLatency)
prometheus.MustRegister(SyncerStaleness)
prometheus.MustRegister(EPSStaleness)
prometheus.MustRegister(NumberOfEndpoints)
prometheus.MustRegister(LabelPropagationError)
prometheus.MustRegister(LabelNumber)
prometheus.MustRegister(AnnotationSize)

RegisterSyncerMetrics()
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/neg/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestComputeLabelMetrics(t *testing.T) {
collector.syncerLabelProagationStats = tc.syncerLabelProagationStats
out := collector.computeLabelMetrics()
if diff := cmp.Diff(out, tc.expect); diff != "" {
t.Errorf("For test case %s, got %+v, want %+v, diff: %s", tc.desc, out, tc.expect, diff)
t.Errorf("For test case %s, (-want +got):\n%s", tc.desc, diff)
}
}
}
32 changes: 24 additions & 8 deletions pkg/neg/metrics/neg_metrics_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
type SyncerMetricsCollector interface {
UpdateSyncer(key negtypes.NegSyncerKey, result *negtypes.NegSyncResult)
SetSyncerEPMetrics(key negtypes.NegSyncerKey, epState *negtypes.SyncerEPStat)
SetLabelPropagationStats(key negtypes.NegSyncerKey, labelstatLabelPropagationStats LabelPropagationStats)
}

type SyncerMetrics struct {
Expand All @@ -50,11 +51,12 @@ type SyncerMetrics struct {
// NewNEGMetricsCollector initializes SyncerMetrics and starts a go routine to compute and export metrics periodically.
func NewNegMetricsCollector(exportInterval time.Duration, logger klog.Logger) *SyncerMetrics {
return &SyncerMetrics{
syncerStatusMap: make(map[negtypes.NegSyncerKey]string),
syncerEndpointStateMap: make(map[negtypes.NegSyncerKey]negtypes.StateCountMap),
syncerEPSStateMap: make(map[negtypes.NegSyncerKey]negtypes.StateCountMap),
metricsInterval: exportInterval,
logger: logger.WithName("NegMetricsCollector"),
syncerStatusMap: make(map[negtypes.NegSyncerKey]string),
syncerEndpointStateMap: make(map[negtypes.NegSyncerKey]negtypes.StateCountMap),
syncerEPSStateMap: make(map[negtypes.NegSyncerKey]negtypes.StateCountMap),
syncerLabelProagationStats: make(map[negtypes.NegSyncerKey]LabelPropagationStats),
metricsInterval: exportInterval,
logger: logger.WithName("NegMetricsCollector"),
}
}

Expand All @@ -79,6 +81,10 @@ func (sm *SyncerMetrics) Run(stopCh <-chan struct{}) {

// export exports syncer metrics.
func (sm *SyncerMetrics) export() {
lpMetrics := sm.computeLabelMetrics()
NumberOfEndpoints.WithLabelValues(totalEndpoints).Set(float64(lpMetrics.NumberOfEndpoints))
NumberOfEndpoints.WithLabelValues(epWithAnnotation).Set(float64(lpMetrics.EndpointsWithAnnotation))
sm.logger.V(3).Info("Exporting syncer related metrics", "Number of Endpoints", lpMetrics.NumberOfEndpoints)
}

// UpdateSyncer update the status of corresponding syncer based on the syncResult.
Expand All @@ -87,7 +93,7 @@ func (sm *SyncerMetrics) UpdateSyncer(key negtypes.NegSyncerKey, syncResult *neg
defer sm.mu.Unlock()
if sm.syncerStatusMap == nil {
sm.syncerStatusMap = make(map[negtypes.NegSyncerKey]string)
sm.logger.V(3).Info("Syncer Metrics failed to initialize correctly, reinitializing syncerStatusMap: %v", sm.syncerStatusMap)
sm.logger.V(3).Info("Syncer Metrics failed to initialize correctly, reinitializing syncerStatusMap")
}
sm.syncerStatusMap[key] = string(syncResult.Result)
}
Expand All @@ -98,17 +104,27 @@ func (sm *SyncerMetrics) SetSyncerEPMetrics(key negtypes.NegSyncerKey, endpointS
defer sm.mu.Unlock()
if sm.syncerEndpointStateMap == nil {
sm.syncerEndpointStateMap = make(map[negtypes.NegSyncerKey]negtypes.StateCountMap)
sm.logger.V(3).Info("Syncer Metrics failed to initialize correctly, reinitializing syncerEPStateMap: %v", sm.syncerEndpointStateMap)
sm.logger.V(3).Info("Syncer Metrics failed to initialize correctly, reinitializing syncerEPStateMap")
}
sm.syncerEndpointStateMap[key] = endpointStat.EndpointStateCount

if sm.syncerEPSStateMap == nil {
sm.syncerEPSStateMap = make(map[negtypes.NegSyncerKey]negtypes.StateCountMap)
sm.logger.V(3).Info("Syncer Metrics failed to initialize correctly, reinitializing syncerEPSStateMap: %v", sm.syncerEPSStateMap)
sm.logger.V(3).Info("Syncer Metrics failed to initialize correctly, reinitializing syncerEPSStateMap")
}
sm.syncerEPSStateMap[key] = endpointStat.EndpointSliceStateCount
}

func (sm *SyncerMetrics) SetLabelPropagationStats(key negtypes.NegSyncerKey, labelstatLabelPropagationStats LabelPropagationStats) {
sm.mu.Lock()
defer sm.mu.Unlock()
if sm.syncerLabelProagationStats == nil {
sm.syncerLabelProagationStats = make(map[negtypes.NegSyncerKey]LabelPropagationStats)
sm.logger.V(3).Info("Syncer Metrics failed to initialize correctly, reinitializing syncerLabelProagationStats")
}
sm.syncerLabelProagationStats[key] = labelstatLabelPropagationStats
}

// computeLabelMetrics aggregates label propagation metrics.
func (sm *SyncerMetrics) computeLabelMetrics() LabelPropagationMetrics {
sm.mu.Lock()
Expand Down
28 changes: 28 additions & 0 deletions pkg/neg/syncers/labels/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"

v1 "k8s.io/api/core/v1"
"k8s.io/ingress-gce/pkg/neg/metrics"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/ingress-gce/pkg/utils"
)
Expand All @@ -44,6 +45,12 @@ type PodLabelMap map[string]string
// EndpointPodLabelMap is a map of network endpoint, endpoint annotations.
type EndpointPodLabelMap map[negtypes.NetworkEndpoint]PodLabelMap

const (
Truncated = "truncated"
TruncationFailure = "truncation_failed"
OtherError = "other_error"
)

var (
ErrLabelTruncated = errors.New("label is truncated")
ErrLabelTruncationFailed = errors.New("failed to truncate label")
Expand All @@ -68,6 +75,7 @@ func GetPodLabelMap(pod *v1.Pod, lpConfig PodLabelPropagationConfig) (PodLabelMa
labelVal, err := truncatePodLabel(lpKey, val, label.MaxLabelSizeBytes)
if err != nil {
errs = append(errs, err)
publishLabelPropagationTruncationMetrics(err)
}

// Add the label to the map only if the truncation result is valid
Expand All @@ -82,6 +90,16 @@ func GetPodLabelMap(pod *v1.Pod, lpConfig PodLabelPropagationConfig) (PodLabelMa
return labelMap, nil
}

// publishLabelPropagationTruncationMetrics publishes errors occured during
// label truncation.
func publishLabelPropagationTruncationMetrics(err error) {
if errors.Is(err, ErrLabelTruncated) {
metrics.PublishLabelPropagationError(Truncated)
} else if errors.Is(err, ErrLabelTruncationFailed) {
metrics.PublishLabelPropagationError(TruncationFailure)
}
}

// truncatePodLabel calculates the potentially truncated label value to ensure that len(key) + len(label) <= maxTotalSize.
// It will return:
//
Expand All @@ -100,3 +118,13 @@ func truncatePodLabel(key, label string, maxTotalSize int) (string, error) {
truncatedVal := string(labelBytes[:maxTotalSize-len(keyBytes)])
return truncatedVal, fmt.Errorf("%w: `%s:%s` is truncated to `%s:%s` because the total length exceeded the limit, length: %d, limit: %d", ErrLabelTruncated, key, label, key, truncatedVal, len(key)+len(label), maxTotalSize)
}

// PodLabelMapSize calculates the size of a podLabelMap.
func GetPodLabelMapSize(podLabelMap PodLabelMap) int {
var res int
for key, val := range podLabelMap {
res += len([]byte(key))
res += len([]byte(val))
}
return res
}
32 changes: 31 additions & 1 deletion pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (s *transactionSyncer) syncInternalImpl() error {
}
s.logger.V(2).Info("Sync NEG", "negSyncerKey", s.NegSyncerKey.String(), "endpointsCalculatorMode", s.endpointsCalculator.Mode())

currentMap, err := retrieveExistingZoneNetworkEndpointMap(s.NegSyncerKey.NegName, s.zoneGetter, s.cloud, s.NegSyncerKey.GetAPIVersion(), s.endpointsCalculator.Mode())
currentMap, currentPodLabelMap, err := retrieveExistingZoneNetworkEndpointMap(s.NegSyncerKey.NegName, s.zoneGetter, s.cloud, s.NegSyncerKey.GetAPIVersion(), s.endpointsCalculator.Mode())
if err != nil {
return err
}
Expand Down Expand Up @@ -312,8 +312,11 @@ func (s *transactionSyncer) syncInternalImpl() error {
// Only fetch label from pod for L7 endpoints
if flags.F.EnableNEGLabelPropagation && s.NegType == negtypes.VmIpPortEndpointType {
endpointPodLabelMap = getEndpointPodLabelMap(addEndpoints, endpointPodMap, s.podLister, s.podLabelPropagationConfig, s.recorder, s.logger)
publishAnnotationSizeMetrics(addEndpoints, endpointPodLabelMap)
}

s.syncCollector.SetLabelPropagationStats(s.NegSyncerKey, collectLabelStats(currentPodLabelMap, endpointPodLabelMap, targetMap))

if s.needCommit() {
s.commitPods(committedEndpoints, endpointPodMap)
}
Expand Down Expand Up @@ -866,11 +869,13 @@ func getEndpointPodLabelMap(endpoints map[string]negtypes.NetworkEndpointSet, en
key := fmt.Sprintf("%s/%s", endpointPodMap[endpoint].Namespace, endpointPodMap[endpoint].Name)
obj, ok, err := podLister.GetByKey(key)
if err != nil || !ok {
metrics.PublishLabelPropagationError(labels.OtherError)
logger.Error(err, "getEndpointPodLabelMap: error getting pod", "pod", key, "exist", ok)
continue
}
pod, ok := obj.(*v1.Pod)
if !ok {
metrics.PublishLabelPropagationError(labels.OtherError)
logger.Error(nil, "expected type *v1.Pod", "pod", key, "type", fmt.Sprintf("%T", obj))
continue
}
Expand All @@ -883,3 +888,28 @@ func getEndpointPodLabelMap(endpoints map[string]negtypes.NetworkEndpointSet, en
}
return endpointPodLabelMap
}

// publishAnnotationSizeMetrics goes through all the endpoints to be attached
// and publish annotation size metrics.
func publishAnnotationSizeMetrics(endpoints map[string]negtypes.NetworkEndpointSet, endpointPodLabelMap labels.EndpointPodLabelMap) {
for _, endpointSet := range endpoints {
for endpoint := range endpointSet {
labelMap := endpointPodLabelMap[endpoint]
metrics.PublishAnnotationMetrics(labels.GetPodLabelMapSize(labelMap), len(labelMap))
}
}
}

// collectLabelStats calculate the number of endpoints and the number of endpoints with annotations.
func collectLabelStats(currentPodLabelMap, addPodLabelMap labels.EndpointPodLabelMap, targetEndpointMap map[string]negtypes.NetworkEndpointSet) metrics.LabelPropagationStats {
labelPropagationStats := metrics.LabelPropagationStats{}
for _, endpointSet := range targetEndpointMap {
for endpoint := range endpointSet {
labelPropagationStats.NumberOfEndpoints += 1
if currentPodLabelMap[endpoint] != nil || addPodLabelMap[endpoint] != nil {
labelPropagationStats.EndpointsWithAnnotation += 1
}
}
}
return labelPropagationStats
}
109 changes: 106 additions & 3 deletions pkg/neg/syncers/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1532,7 +1532,7 @@ func TestUnknownNodes(t *testing.T) {
}

// Check that unknown zone did not cause endpoints to be removed
out, err := retrieveExistingZoneNetworkEndpointMap(testNegName, zoneGetter, fakeCloud, meta.VersionGA, negtypes.L7Mode)
out, _, err := retrieveExistingZoneNetworkEndpointMap(testNegName, zoneGetter, fakeCloud, meta.VersionGA, negtypes.L7Mode)
if err != nil {
t.Errorf("errored retrieving existing network endpoints")
}
Expand Down Expand Up @@ -1761,7 +1761,7 @@ func TestEnableDegradedMode(t *testing.T) {
(s.syncer.(*syncer)).stopped = false
tc.modify(s)

out, err := retrieveExistingZoneNetworkEndpointMap(tc.negName, zoneGetter, fakeCloud, meta.VersionGA, negtypes.L7Mode)
out, _, err := retrieveExistingZoneNetworkEndpointMap(tc.negName, zoneGetter, fakeCloud, meta.VersionGA, negtypes.L7Mode)
if err != nil {
t.Errorf("errored retrieving existing network endpoints")
}
Expand All @@ -1777,7 +1777,7 @@ func TestEnableDegradedMode(t *testing.T) {
t.Errorf("after syncInternal, error state is %v, expected to be %v", s.inErrorState(), tc.expectedInErrorState)
}
err = wait.PollImmediate(time.Second, 3*time.Second, func() (bool, error) {
out, err = retrieveExistingZoneNetworkEndpointMap(tc.negName, zoneGetter, fakeCloud, meta.VersionGA, negtypes.L7Mode)
out, _, err = retrieveExistingZoneNetworkEndpointMap(tc.negName, zoneGetter, fakeCloud, meta.VersionGA, negtypes.L7Mode)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -1910,6 +1910,109 @@ func TestGetEndpointPodLabelMap(t *testing.T) {
}
}

func TestCollectLabelStats(t *testing.T) {
t.Parallel()

testIP1 := "1.2.3.4"
testIP2 := "1.2.3.5"
testIP3 := "1.2.3.6"
testIP4 := "1.2.3.7"
testPort := int64(80)
endpoint1 := negtypes.NetworkEndpoint{IP: testIP1, Node: negtypes.TestInstance1, Port: strconv.Itoa(int(testPort))}
endpoint2 := negtypes.NetworkEndpoint{IP: testIP2, Node: negtypes.TestInstance2, Port: strconv.Itoa(int(testPort))}
endpoint3 := negtypes.NetworkEndpoint{IP: testIP3, Node: negtypes.TestInstance3, Port: strconv.Itoa(int(testPort))}
endpoint4 := negtypes.NetworkEndpoint{IP: testIP4, Node: negtypes.TestInstance4, Port: strconv.Itoa(int(testPort))}

for _, tc := range []struct {
desc string
curLabelMap labels.EndpointPodLabelMap
addLabelMap labels.EndpointPodLabelMap
targetEndpointMap map[string]negtypes.NetworkEndpointSet
expect metrics.LabelPropagationStats
}{
{
desc: "Empty inputs",
curLabelMap: labels.EndpointPodLabelMap{},
addLabelMap: labels.EndpointPodLabelMap{},
targetEndpointMap: map[string]negtypes.NetworkEndpointSet{},
expect: metrics.LabelPropagationStats{
EndpointsWithAnnotation: 0,
NumberOfEndpoints: 0,
},
},
{
desc: "No new endpoints to be added",
curLabelMap: labels.EndpointPodLabelMap{
endpoint1: labels.PodLabelMap{
"foo": "bar",
},
},
addLabelMap: labels.EndpointPodLabelMap{},
targetEndpointMap: map[string]negtypes.NetworkEndpointSet{
testZone1: negtypes.NewNetworkEndpointSet(
endpoint1,
endpoint2,
),
},
expect: metrics.LabelPropagationStats{
EndpointsWithAnnotation: 1,
NumberOfEndpoints: 2,
},
},
{
desc: "Some endpoints to be added",
curLabelMap: labels.EndpointPodLabelMap{
endpoint1: labels.PodLabelMap{
"foo": "bar",
},
},
addLabelMap: labels.EndpointPodLabelMap{
endpoint3: labels.PodLabelMap{
"foo": "bar",
},
},
targetEndpointMap: map[string]negtypes.NetworkEndpointSet{
testZone1: negtypes.NewNetworkEndpointSet(
endpoint1,
endpoint2,
),
testZone2: negtypes.NewNetworkEndpointSet(
endpoint3,
endpoint4,
),
},
expect: metrics.LabelPropagationStats{
EndpointsWithAnnotation: 2,
NumberOfEndpoints: 4,
},
},
{
desc: "Only newly added endpoints",
curLabelMap: labels.EndpointPodLabelMap{},
addLabelMap: labels.EndpointPodLabelMap{
endpoint3: labels.PodLabelMap{
"foo": "bar",
},
},
targetEndpointMap: map[string]negtypes.NetworkEndpointSet{
testZone2: negtypes.NewNetworkEndpointSet(
endpoint3,
endpoint4,
),
},
expect: metrics.LabelPropagationStats{
EndpointsWithAnnotation: 1,
NumberOfEndpoints: 2,
},
},
} {
out := collectLabelStats(tc.curLabelMap, tc.addLabelMap, tc.targetEndpointMap)
if diff := cmp.Diff(out, tc.expect); diff != "" {
t.Errorf("For test case %s: (-want +got): \n%s", tc.desc, diff)
}
}
}

func newL4ILBTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, mode negtypes.EndpointsCalculatorMode) (negtypes.NegSyncer, *transactionSyncer) {
negsyncer, ts := newTestTransactionSyncer(fakeGCE, negtypes.VmIpEndpointType, false)
ts.endpointsCalculator = GetEndpointsCalculator(ts.nodeLister, ts.podLister, ts.zoneGetter, ts.NegSyncerKey, mode, klog.TODO(), ts.enableDualStackNEG)
Expand Down
Loading

0 comments on commit 6080857

Please sign in to comment.