From 505cdc218ef7fd7083dfba128f166d1fdb6ad491 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Wed, 10 Jul 2024 18:33:41 -0700 Subject: [PATCH] Using ingester id instead of address on the distributor metrics. (#6078) * Using ingester ID on the distributor metrics Signed-off-by: alanprot * fix test Signed-off-by: alanprot * fix TestPush_QuorumError test Signed-off-by: alanprot * Changelog Signed-off-by: alanprot --------- Signed-off-by: alanprot --- CHANGELOG.md | 1 + .../shuffle_sharding_grouper_test.go | 4 + pkg/distributor/distributor.go | 22 +++-- pkg/distributor/distributor_test.go | 90 +++++++++---------- pkg/distributor/query.go | 23 +++-- pkg/ring/model.go | 10 +++ pkg/ring/ring.go | 17 ++++ pkg/ring/util_test.go | 4 + 8 files changed, 114 insertions(+), 57 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ab0be698a..da1dd4cc3b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * [CHANGE] Upgrade Dockerfile Node version from 14x to 18x. #5906 * [CHANGE] Ingester: Remove `-querier.query-store-for-labels-enabled` flag. Querying long-term store for labels is always enabled. #5984 * [CHANGE] Server: Instrument `cortex_request_duration_seconds` metric with native histogram. If `native-histograms` feature is enabled in monitoring Prometheus then the metric name needs to be updated in your dashboards. #6056 +* [CHANGE] Distributor/Ingester: Change `cortex_distributor_ingester_appends_total`, `cortex_distributor_ingester_append_failures_total`, `cortex_distributor_ingester_queries_total`, and `cortex_distributor_ingester_query_failures_total` metrics to use the ingester ID instead of its IP as the label value. #6078 * [FEATURE] Ingester: Experimental: Enable native histogram ingestion via `-blocks-storage.tsdb.enable-native-histograms` flag. #5986 * [FEATURE] Query Frontend: Added a query rejection mechanism to block resource-intensive queries. #6005 * [FEATURE] OTLP: Support ingesting OTLP exponential metrics as native histograms. #6071 diff --git a/pkg/compactor/shuffle_sharding_grouper_test.go b/pkg/compactor/shuffle_sharding_grouper_test.go index ab19ece87b..fb935c9c9f 100644 --- a/pkg/compactor/shuffle_sharding_grouper_test.go +++ b/pkg/compactor/shuffle_sharding_grouper_test.go @@ -747,6 +747,10 @@ type RingMock struct { mock.Mock } +func (r *RingMock) GetInstanceIdByAddr(addr string) (string, error) { + return "", nil +} + func (r *RingMock) Collect(ch chan<- prometheus.Metric) {} func (r *RingMock) Describe(ch chan<- *prometheus.Desc) {} diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index c4346df74e..055053b436 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -734,10 +734,12 @@ func (d *Distributor) cleanStaleIngesterMetrics() { return } - ipsMap := map[string]struct{}{} + idsMap := map[string]struct{}{} for _, ing := range append(healthy, unhealthy...) { - ipsMap[ing.Addr] = struct{}{} + if id, err := d.ingestersRing.GetInstanceIdByAddr(ing.Addr); err == nil { + idsMap[id] = struct{}{} + } } ingesterMetrics := []*prometheus.CounterVec{d.ingesterAppends, d.ingesterAppendFailures, d.ingesterQueries, d.ingesterQueryFailures} @@ -751,7 +753,7 @@ func (d *Distributor) cleanStaleIngesterMetrics() { } for _, lbls := range metrics { - if _, ok := ipsMap[lbls.Get("ingester")]; !ok { + if _, ok := idsMap[lbls.Get("ingester")]; !ok { err := util.DeleteMatchingLabels(m, map[string]string{"ingester": lbls.Get("ingester")}) if err != nil { level.Warn(d.log).Log("msg", "error cleaning metrics: DeleteMatchingLabels", "err", err) @@ -956,6 +958,12 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time if err != nil { return err } + + id, err := d.ingestersRing.GetInstanceIdByAddr(ingester.Addr) + if err != nil { + level.Warn(d.log).Log("msg", "instance not found in the ring", "addr", ingester.Addr, "err", err) + } + c := h.(ingester_client.HealthAndIngesterClient) req := cortexpb.PreallocWriteRequestFromPool() @@ -972,15 +980,15 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time } if len(metadata) > 0 { - d.ingesterAppends.WithLabelValues(ingester.Addr, typeMetadata).Inc() + d.ingesterAppends.WithLabelValues(id, typeMetadata).Inc() if err != nil { - d.ingesterAppendFailures.WithLabelValues(ingester.Addr, typeMetadata, getErrorStatus(err)).Inc() + d.ingesterAppendFailures.WithLabelValues(id, typeMetadata, getErrorStatus(err)).Inc() } } if len(timeseries) > 0 { - d.ingesterAppends.WithLabelValues(ingester.Addr, typeSamples).Inc() + d.ingesterAppends.WithLabelValues(id, typeSamples).Inc() if err != nil { - d.ingesterAppendFailures.WithLabelValues(ingester.Addr, typeSamples, getErrorStatus(err)).Inc() + d.ingesterAppendFailures.WithLabelValues(id, typeSamples, getErrorStatus(err)).Inc() } } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index ba45efd636..75c104528d 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -232,12 +232,12 @@ func TestDistributor_Push(t *testing.T) { expectedMetrics: ` # HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters. # TYPE cortex_distributor_ingester_append_failures_total counter - cortex_distributor_ingester_append_failures_total{ingester="2",status="5xx",type="samples"} 1 + cortex_distributor_ingester_append_failures_total{ingester="ingester-2",status="5xx",type="samples"} 1 # HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters. # TYPE cortex_distributor_ingester_appends_total counter - cortex_distributor_ingester_appends_total{ingester="0",type="samples"} 1 - cortex_distributor_ingester_appends_total{ingester="1",type="samples"} 1 - cortex_distributor_ingester_appends_total{ingester="2",type="samples"} 1 + cortex_distributor_ingester_appends_total{ingester="ingester-0",type="samples"} 1 + cortex_distributor_ingester_appends_total{ingester="ingester-1",type="samples"} 1 + cortex_distributor_ingester_appends_total{ingester="ingester-2",type="samples"} 1 `, }, "A push to ingesters should report the correct metrics with no samples": { @@ -251,12 +251,12 @@ func TestDistributor_Push(t *testing.T) { expectedMetrics: ` # HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters. # TYPE cortex_distributor_ingester_append_failures_total counter - cortex_distributor_ingester_append_failures_total{ingester="2",status="5xx",type="metadata"} 1 + cortex_distributor_ingester_append_failures_total{ingester="ingester-2",status="5xx",type="metadata"} 1 # HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters. # TYPE cortex_distributor_ingester_appends_total counter - cortex_distributor_ingester_appends_total{ingester="0",type="metadata"} 1 - cortex_distributor_ingester_appends_total{ingester="1",type="metadata"} 1 - cortex_distributor_ingester_appends_total{ingester="2",type="metadata"} 1 + cortex_distributor_ingester_appends_total{ingester="ingester-0",type="metadata"} 1 + cortex_distributor_ingester_appends_total{ingester="ingester-1",type="metadata"} 1 + cortex_distributor_ingester_appends_total{ingester="ingester-2",type="metadata"} 1 `, }, "A push to overloaded ingesters should report the correct metrics": { @@ -268,14 +268,14 @@ func TestDistributor_Push(t *testing.T) { expectedResponse: emptyResponse, ingesterError: httpgrpc.Errorf(http.StatusTooManyRequests, "Fail"), expectedMetrics: ` - # HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters. - # TYPE cortex_distributor_ingester_append_failures_total counter - cortex_distributor_ingester_append_failures_total{ingester="2",status="4xx",type="metadata"} 1 # HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters. # TYPE cortex_distributor_ingester_appends_total counter - cortex_distributor_ingester_appends_total{ingester="0",type="metadata"} 1 - cortex_distributor_ingester_appends_total{ingester="1",type="metadata"} 1 - cortex_distributor_ingester_appends_total{ingester="2",type="metadata"} 1 + cortex_distributor_ingester_appends_total{ingester="ingester-0",type="metadata"} 1 + cortex_distributor_ingester_appends_total{ingester="ingester-1",type="metadata"} 1 + cortex_distributor_ingester_appends_total{ingester="ingester-2",type="metadata"} 1 + # HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters. + # TYPE cortex_distributor_ingester_append_failures_total counter + cortex_distributor_ingester_append_failures_total{ingester="ingester-2",status="4xx",type="metadata"} 1 `, }, "A push to 3 happy ingesters should succeed, histograms": { @@ -436,14 +436,16 @@ func TestDistributor_MetricsCleanup(t *testing.T) { d.latestSeenSampleTimestampPerUser.WithLabelValues("userA").Set(1111) h, _, _ := r.GetAllInstanceDescs(ring.WriteNoExtend) - d.ingesterAppends.WithLabelValues(h[0].Addr, typeMetadata).Inc() - d.ingesterAppendFailures.WithLabelValues(h[0].Addr, typeMetadata, "2xx").Inc() - d.ingesterAppends.WithLabelValues(h[1].Addr, typeMetadata).Inc() - d.ingesterAppendFailures.WithLabelValues(h[1].Addr, typeMetadata, "2xx").Inc() - d.ingesterQueries.WithLabelValues(h[0].Addr).Inc() - d.ingesterQueries.WithLabelValues(h[1].Addr).Inc() - d.ingesterQueryFailures.WithLabelValues(h[0].Addr).Inc() - d.ingesterQueryFailures.WithLabelValues(h[1].Addr).Inc() + ingId0, _ := r.GetInstanceIdByAddr(h[0].Addr) + ingId1, _ := r.GetInstanceIdByAddr(h[1].Addr) + d.ingesterAppends.WithLabelValues(ingId0, typeMetadata).Inc() + d.ingesterAppendFailures.WithLabelValues(ingId0, typeMetadata, "2xx").Inc() + d.ingesterAppends.WithLabelValues(ingId1, typeMetadata).Inc() + d.ingesterAppendFailures.WithLabelValues(ingId1, typeMetadata, "2xx").Inc() + d.ingesterQueries.WithLabelValues(ingId0).Inc() + d.ingesterQueries.WithLabelValues(ingId1).Inc() + d.ingesterQueryFailures.WithLabelValues(ingId0).Inc() + d.ingesterQueryFailures.WithLabelValues(ingId1).Inc() require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` # HELP cortex_distributor_deduped_samples_total The total number of deduplicated samples. @@ -489,27 +491,27 @@ func TestDistributor_MetricsCleanup(t *testing.T) { # HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters. # TYPE cortex_distributor_ingester_append_failures_total counter - cortex_distributor_ingester_append_failures_total{ingester="0",status="2xx",type="metadata"} 1 - cortex_distributor_ingester_append_failures_total{ingester="1",status="2xx",type="metadata"} 1 + cortex_distributor_ingester_append_failures_total{ingester="ingester-0",status="2xx",type="metadata"} 1 + cortex_distributor_ingester_append_failures_total{ingester="ingester-1",status="2xx",type="metadata"} 1 # HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters. # TYPE cortex_distributor_ingester_appends_total counter - cortex_distributor_ingester_appends_total{ingester="0",type="metadata"} 1 - cortex_distributor_ingester_appends_total{ingester="1",type="metadata"} 1 + cortex_distributor_ingester_appends_total{ingester="ingester-0",type="metadata"} 1 + cortex_distributor_ingester_appends_total{ingester="ingester-1",type="metadata"} 1 # HELP cortex_distributor_ingester_queries_total The total number of queries sent to ingesters. # TYPE cortex_distributor_ingester_queries_total counter - cortex_distributor_ingester_queries_total{ingester="0"} 1 - cortex_distributor_ingester_queries_total{ingester="1"} 1 + cortex_distributor_ingester_queries_total{ingester="ingester-0"} 1 + cortex_distributor_ingester_queries_total{ingester="ingester-1"} 1 # HELP cortex_distributor_ingester_query_failures_total The total number of failed queries sent to ingesters. # TYPE cortex_distributor_ingester_query_failures_total counter - cortex_distributor_ingester_query_failures_total{ingester="0"} 1 - cortex_distributor_ingester_query_failures_total{ingester="1"} 1 + cortex_distributor_ingester_query_failures_total{ingester="ingester-0"} 1 + cortex_distributor_ingester_query_failures_total{ingester="ingester-1"} 1 `), metrics...)) d.cleanupInactiveUser("userA") err := r.KVClient.CAS(context.Background(), ingester.RingKey, func(in interface{}) (interface{}, bool, error) { r := in.(*ring.Desc) - delete(r.Ingesters, "0") + delete(r.Ingesters, "ingester-0") return in, true, nil }) @@ -556,16 +558,16 @@ func TestDistributor_MetricsCleanup(t *testing.T) { # HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters. # TYPE cortex_distributor_ingester_append_failures_total counter - cortex_distributor_ingester_append_failures_total{ingester="1",status="2xx",type="metadata"} 1 + cortex_distributor_ingester_append_failures_total{ingester="ingester-1",status="2xx",type="metadata"} 1 # HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters. # TYPE cortex_distributor_ingester_appends_total counter - cortex_distributor_ingester_appends_total{ingester="1",type="metadata"} 1 + cortex_distributor_ingester_appends_total{ingester="ingester-1",type="metadata"} 1 # HELP cortex_distributor_ingester_queries_total The total number of queries sent to ingesters. # TYPE cortex_distributor_ingester_queries_total counter - cortex_distributor_ingester_queries_total{ingester="1"} 1 + cortex_distributor_ingester_queries_total{ingester="ingester-1"} 1 # HELP cortex_distributor_ingester_query_failures_total The total number of failed queries sent to ingesters. # TYPE cortex_distributor_ingester_query_failures_total counter - cortex_distributor_ingester_query_failures_total{ingester="1"} 1 + cortex_distributor_ingester_query_failures_total{ingester="ingester-1"} 1 `), metrics...)) } @@ -757,23 +759,20 @@ func TestPush_QuorumError(t *testing.T) { err := r.KVClient.CAS(context.Background(), ingester.RingKey, func(in interface{}) (interface{}, bool, error) { r := in.(*ring.Desc) - ingester2 := r.Ingesters["2"] + ingester2 := r.Ingesters["ingester-2"] ingester2.State = ring.LEFT ingester2.Timestamp = time.Now().Unix() - r.Ingesters["2"] = ingester2 + r.Ingesters["ingester-2"] = ingester2 return in, true, nil }) require.NoError(t, err) // Give time to the ring get updated with the KV value - for { + test.Poll(t, 15*time.Second, true, func() interface{} { replicationSet, _ := r.GetAllHealthy(ring.Read) - if len(replicationSet.Instances) == 2 { - break - } - time.Sleep(100 * time.Millisecond) - } + return len(replicationSet.Instances) == 2 + }) for i := 0; i < numberOfWrites; i++ { request := makeWriteRequest(0, 30, 20, 10) @@ -2717,8 +2716,9 @@ func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, [] } else { tokens = []uint32{uint32((math.MaxUint32 / cfg.numIngesters) * i)} } - addr := fmt.Sprintf("%d", i) - ingesterDescs[addr] = ring.InstanceDesc{ + ingester := fmt.Sprintf("ingester-%d", i) + addr := fmt.Sprintf("ip-ingester-%d", i) + ingesterDescs[ingester] = ring.InstanceDesc{ Addr: addr, Zone: "", State: ring.ACTIVE, diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 882d67bc3d..c81e9f3e77 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -6,6 +6,7 @@ import ( "sort" "time" + "github.com/go-kit/log/level" "github.com/opentracing/opentracing-go" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -165,10 +166,16 @@ func (d *Distributor) queryIngestersExemplars(ctx context.Context, replicationSe return nil, err } + ingesterId, err := d.ingestersRing.GetInstanceIdByAddr(ing.Addr) + if err != nil { + level.Warn(d.log).Log("msg", "instance not found in the ring", "addr", ing.Addr, "err", err) + } + resp, err := client.(ingester_client.IngesterClient).QueryExemplars(ctx, req) - d.ingesterQueries.WithLabelValues(ing.Addr).Inc() + + d.ingesterQueries.WithLabelValues(ingesterId).Inc() if err != nil { - d.ingesterQueryFailures.WithLabelValues(ing.Addr).Inc() + d.ingesterQueryFailures.WithLabelValues(ingesterId).Inc() return nil, err } @@ -225,11 +232,17 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri if err != nil { return nil, err } - d.ingesterQueries.WithLabelValues(ing.Addr).Inc() + + ingesterId, err := d.ingestersRing.GetInstanceIdByAddr(ing.Addr) + if err != nil { + level.Warn(d.log).Log("msg", "instance not found in the ring", "addr", ing.Addr, "err", err) + } + + d.ingesterQueries.WithLabelValues(ingesterId).Inc() stream, err := client.(ingester_client.IngesterClient).QueryStream(ctx, req) if err != nil { - d.ingesterQueryFailures.WithLabelValues(ing.Addr).Inc() + d.ingesterQueryFailures.WithLabelValues(ingesterId).Inc() return nil, err } defer stream.CloseSend() //nolint:errcheck @@ -242,7 +255,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri } else if err != nil { // Do not track a failure if the context was canceled. if !grpcutil.IsGRPCContextCanceled(err) { - d.ingesterQueryFailures.WithLabelValues(ing.Addr).Inc() + d.ingesterQueryFailures.WithLabelValues(ingesterId).Inc() } return nil, err diff --git a/pkg/ring/model.go b/pkg/ring/model.go index a801705720..503ab63e69 100644 --- a/pkg/ring/model.go +++ b/pkg/ring/model.go @@ -527,6 +527,16 @@ func (d *Desc) getTokensByZone() map[string][]uint32 { return MergeTokensByZone(zones) } +// getInstancesByAddr returns instances id by its address +func (d *Desc) getInstancesByAddr() map[string]string { + instancesByAddMap := make(map[string]string, len(d.Ingesters)) + for id, instance := range d.Ingesters { + instancesByAddMap[instance.Addr] = id + } + + return instancesByAddMap +} + type CompareResult int // CompareResult responses diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 78caa01c4d..38b1d48489 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -80,6 +80,10 @@ type ReadRing interface { // instance does not exist in the ring. GetInstanceState(instanceID string) (InstanceState, error) + // GetInstanceIdByAddr returns the instance id from its address or an error if the + // // instance does not exist in the ring. + GetInstanceIdByAddr(addr string) (string, error) + // ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes // all instances that have been part of the identifier's shard since "now - lookbackPeriod". ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing @@ -186,6 +190,8 @@ type Ring struct { // change it is to create a new one and replace it). ringInstanceByToken map[uint32]instanceInfo + ringInstanceIdByAddr map[string]string + // When did a set of instances change the last time (instance changing state or heartbeat is ignored for this timestamp). lastTopologyChange time.Time @@ -338,6 +344,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) { ringTokens := ringDesc.GetTokens() ringTokensByZone := ringDesc.getTokensByZone() ringInstanceByToken := ringDesc.getTokensInfo() + ringInstanceByAddr := ringDesc.getInstancesByAddr() ringZones := getZones(ringTokensByZone) r.mtx.Lock() @@ -346,6 +353,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) { r.ringTokens = ringTokens r.ringTokensByZone = ringTokensByZone r.ringInstanceByToken = ringInstanceByToken + r.ringInstanceIdByAddr = ringInstanceByAddr r.ringZones = ringZones r.lastTopologyChange = now if r.shuffledSubringCache != nil { @@ -895,6 +903,15 @@ func (r *Ring) GetInstanceState(instanceID string) (InstanceState, error) { return instance.GetState(), nil } +// GetInstanceIdByAddr implements ReadRing. +func (r *Ring) GetInstanceIdByAddr(addr string) (string, error) { + if i, ok := r.ringInstanceIdByAddr[addr]; ok { + return i, nil + } + + return "notFound", ErrInstanceNotFound +} + // HasInstance returns whether the ring contains an instance matching the provided instanceID. func (r *Ring) HasInstance(instanceID string) bool { r.mtx.RLock() diff --git a/pkg/ring/util_test.go b/pkg/ring/util_test.go index 29fac1f2bf..563cc0aa7f 100644 --- a/pkg/ring/util_test.go +++ b/pkg/ring/util_test.go @@ -17,6 +17,10 @@ type RingMock struct { mock.Mock } +func (r *RingMock) GetInstanceIdByAddr(addr string) (string, error) { + return "", nil +} + func (r *RingMock) Collect(ch chan<- prometheus.Metric) {} func (r *RingMock) Describe(ch chan<- *prometheus.Desc) {}