Skip to content

Commit

Permalink
Using ingester id instead of address on the distributor metrics. (cor…
Browse files Browse the repository at this point in the history
…texproject#6078)

* Using ingester ID on the distributor metrics

Signed-off-by: alanprot <[email protected]>

* fix test

Signed-off-by: alanprot <[email protected]>

* fix TestPush_QuorumError test

Signed-off-by: alanprot <[email protected]>

* Changelog

Signed-off-by: alanprot <[email protected]>

---------

Signed-off-by: alanprot <[email protected]>
  • Loading branch information
alanprot authored Jul 11, 2024
1 parent ee8d110 commit 505cdc2
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 57 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* [CHANGE] Upgrade Dockerfile Node version from 14x to 18x. #5906
* [CHANGE] Ingester: Remove `-querier.query-store-for-labels-enabled` flag. Querying long-term store for labels is always enabled. #5984
* [CHANGE] Server: Instrument `cortex_request_duration_seconds` metric with native histogram. If `native-histograms` feature is enabled in monitoring Prometheus then the metric name needs to be updated in your dashboards. #6056
* [CHANGE] Distributor/Ingester: Change `cortex_distributor_ingester_appends_total`, `cortex_distributor_ingester_append_failures_total`, `cortex_distributor_ingester_queries_total`, and `cortex_distributor_ingester_query_failures_total` metrics to use the ingester ID instead of its IP as the label value. #6078
* [FEATURE] Ingester: Experimental: Enable native histogram ingestion via `-blocks-storage.tsdb.enable-native-histograms` flag. #5986
* [FEATURE] Query Frontend: Added a query rejection mechanism to block resource-intensive queries. #6005
* [FEATURE] OTLP: Support ingesting OTLP exponential metrics as native histograms. #6071
Expand Down
4 changes: 4 additions & 0 deletions pkg/compactor/shuffle_sharding_grouper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,10 @@ type RingMock struct {
mock.Mock
}

func (r *RingMock) GetInstanceIdByAddr(addr string) (string, error) {
return "", nil
}

func (r *RingMock) Collect(ch chan<- prometheus.Metric) {}

func (r *RingMock) Describe(ch chan<- *prometheus.Desc) {}
Expand Down
22 changes: 15 additions & 7 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,10 +734,12 @@ func (d *Distributor) cleanStaleIngesterMetrics() {
return
}

ipsMap := map[string]struct{}{}
idsMap := map[string]struct{}{}

for _, ing := range append(healthy, unhealthy...) {
ipsMap[ing.Addr] = struct{}{}
if id, err := d.ingestersRing.GetInstanceIdByAddr(ing.Addr); err == nil {
idsMap[id] = struct{}{}
}
}

ingesterMetrics := []*prometheus.CounterVec{d.ingesterAppends, d.ingesterAppendFailures, d.ingesterQueries, d.ingesterQueryFailures}
Expand All @@ -751,7 +753,7 @@ func (d *Distributor) cleanStaleIngesterMetrics() {
}

for _, lbls := range metrics {
if _, ok := ipsMap[lbls.Get("ingester")]; !ok {
if _, ok := idsMap[lbls.Get("ingester")]; !ok {
err := util.DeleteMatchingLabels(m, map[string]string{"ingester": lbls.Get("ingester")})
if err != nil {
level.Warn(d.log).Log("msg", "error cleaning metrics: DeleteMatchingLabels", "err", err)
Expand Down Expand Up @@ -956,6 +958,12 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time
if err != nil {
return err
}

id, err := d.ingestersRing.GetInstanceIdByAddr(ingester.Addr)
if err != nil {
level.Warn(d.log).Log("msg", "instance not found in the ring", "addr", ingester.Addr, "err", err)
}

c := h.(ingester_client.HealthAndIngesterClient)

req := cortexpb.PreallocWriteRequestFromPool()
Expand All @@ -972,15 +980,15 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time
}

if len(metadata) > 0 {
d.ingesterAppends.WithLabelValues(ingester.Addr, typeMetadata).Inc()
d.ingesterAppends.WithLabelValues(id, typeMetadata).Inc()
if err != nil {
d.ingesterAppendFailures.WithLabelValues(ingester.Addr, typeMetadata, getErrorStatus(err)).Inc()
d.ingesterAppendFailures.WithLabelValues(id, typeMetadata, getErrorStatus(err)).Inc()
}
}
if len(timeseries) > 0 {
d.ingesterAppends.WithLabelValues(ingester.Addr, typeSamples).Inc()
d.ingesterAppends.WithLabelValues(id, typeSamples).Inc()
if err != nil {
d.ingesterAppendFailures.WithLabelValues(ingester.Addr, typeSamples, getErrorStatus(err)).Inc()
d.ingesterAppendFailures.WithLabelValues(id, typeSamples, getErrorStatus(err)).Inc()
}
}

Expand Down
90 changes: 45 additions & 45 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,12 @@ func TestDistributor_Push(t *testing.T) {
expectedMetrics: `
# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_append_failures_total counter
cortex_distributor_ingester_append_failures_total{ingester="2",status="5xx",type="samples"} 1
cortex_distributor_ingester_append_failures_total{ingester="ingester-2",status="5xx",type="samples"} 1
# HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_appends_total counter
cortex_distributor_ingester_appends_total{ingester="0",type="samples"} 1
cortex_distributor_ingester_appends_total{ingester="1",type="samples"} 1
cortex_distributor_ingester_appends_total{ingester="2",type="samples"} 1
cortex_distributor_ingester_appends_total{ingester="ingester-0",type="samples"} 1
cortex_distributor_ingester_appends_total{ingester="ingester-1",type="samples"} 1
cortex_distributor_ingester_appends_total{ingester="ingester-2",type="samples"} 1
`,
},
"A push to ingesters should report the correct metrics with no samples": {
Expand All @@ -251,12 +251,12 @@ func TestDistributor_Push(t *testing.T) {
expectedMetrics: `
# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_append_failures_total counter
cortex_distributor_ingester_append_failures_total{ingester="2",status="5xx",type="metadata"} 1
cortex_distributor_ingester_append_failures_total{ingester="ingester-2",status="5xx",type="metadata"} 1
# HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_appends_total counter
cortex_distributor_ingester_appends_total{ingester="0",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="1",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="2",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="ingester-0",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="ingester-1",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="ingester-2",type="metadata"} 1
`,
},
"A push to overloaded ingesters should report the correct metrics": {
Expand All @@ -268,14 +268,14 @@ func TestDistributor_Push(t *testing.T) {
expectedResponse: emptyResponse,
ingesterError: httpgrpc.Errorf(http.StatusTooManyRequests, "Fail"),
expectedMetrics: `
# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_append_failures_total counter
cortex_distributor_ingester_append_failures_total{ingester="2",status="4xx",type="metadata"} 1
# HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_appends_total counter
cortex_distributor_ingester_appends_total{ingester="0",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="1",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="2",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="ingester-0",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="ingester-1",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="ingester-2",type="metadata"} 1
# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_append_failures_total counter
cortex_distributor_ingester_append_failures_total{ingester="ingester-2",status="4xx",type="metadata"} 1
`,
},
"A push to 3 happy ingesters should succeed, histograms": {
Expand Down Expand Up @@ -436,14 +436,16 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
d.latestSeenSampleTimestampPerUser.WithLabelValues("userA").Set(1111)

h, _, _ := r.GetAllInstanceDescs(ring.WriteNoExtend)
d.ingesterAppends.WithLabelValues(h[0].Addr, typeMetadata).Inc()
d.ingesterAppendFailures.WithLabelValues(h[0].Addr, typeMetadata, "2xx").Inc()
d.ingesterAppends.WithLabelValues(h[1].Addr, typeMetadata).Inc()
d.ingesterAppendFailures.WithLabelValues(h[1].Addr, typeMetadata, "2xx").Inc()
d.ingesterQueries.WithLabelValues(h[0].Addr).Inc()
d.ingesterQueries.WithLabelValues(h[1].Addr).Inc()
d.ingesterQueryFailures.WithLabelValues(h[0].Addr).Inc()
d.ingesterQueryFailures.WithLabelValues(h[1].Addr).Inc()
ingId0, _ := r.GetInstanceIdByAddr(h[0].Addr)
ingId1, _ := r.GetInstanceIdByAddr(h[1].Addr)
d.ingesterAppends.WithLabelValues(ingId0, typeMetadata).Inc()
d.ingesterAppendFailures.WithLabelValues(ingId0, typeMetadata, "2xx").Inc()
d.ingesterAppends.WithLabelValues(ingId1, typeMetadata).Inc()
d.ingesterAppendFailures.WithLabelValues(ingId1, typeMetadata, "2xx").Inc()
d.ingesterQueries.WithLabelValues(ingId0).Inc()
d.ingesterQueries.WithLabelValues(ingId1).Inc()
d.ingesterQueryFailures.WithLabelValues(ingId0).Inc()
d.ingesterQueryFailures.WithLabelValues(ingId1).Inc()

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_distributor_deduped_samples_total The total number of deduplicated samples.
Expand Down Expand Up @@ -489,27 +491,27 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_append_failures_total counter
cortex_distributor_ingester_append_failures_total{ingester="0",status="2xx",type="metadata"} 1
cortex_distributor_ingester_append_failures_total{ingester="1",status="2xx",type="metadata"} 1
cortex_distributor_ingester_append_failures_total{ingester="ingester-0",status="2xx",type="metadata"} 1
cortex_distributor_ingester_append_failures_total{ingester="ingester-1",status="2xx",type="metadata"} 1
# HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_appends_total counter
cortex_distributor_ingester_appends_total{ingester="0",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="1",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="ingester-0",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="ingester-1",type="metadata"} 1
# HELP cortex_distributor_ingester_queries_total The total number of queries sent to ingesters.
# TYPE cortex_distributor_ingester_queries_total counter
cortex_distributor_ingester_queries_total{ingester="0"} 1
cortex_distributor_ingester_queries_total{ingester="1"} 1
cortex_distributor_ingester_queries_total{ingester="ingester-0"} 1
cortex_distributor_ingester_queries_total{ingester="ingester-1"} 1
# HELP cortex_distributor_ingester_query_failures_total The total number of failed queries sent to ingesters.
# TYPE cortex_distributor_ingester_query_failures_total counter
cortex_distributor_ingester_query_failures_total{ingester="0"} 1
cortex_distributor_ingester_query_failures_total{ingester="1"} 1
cortex_distributor_ingester_query_failures_total{ingester="ingester-0"} 1
cortex_distributor_ingester_query_failures_total{ingester="ingester-1"} 1
`), metrics...))

d.cleanupInactiveUser("userA")

err := r.KVClient.CAS(context.Background(), ingester.RingKey, func(in interface{}) (interface{}, bool, error) {
r := in.(*ring.Desc)
delete(r.Ingesters, "0")
delete(r.Ingesters, "ingester-0")
return in, true, nil
})

Expand Down Expand Up @@ -556,16 +558,16 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_append_failures_total counter
cortex_distributor_ingester_append_failures_total{ingester="1",status="2xx",type="metadata"} 1
cortex_distributor_ingester_append_failures_total{ingester="ingester-1",status="2xx",type="metadata"} 1
# HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_appends_total counter
cortex_distributor_ingester_appends_total{ingester="1",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="ingester-1",type="metadata"} 1
# HELP cortex_distributor_ingester_queries_total The total number of queries sent to ingesters.
# TYPE cortex_distributor_ingester_queries_total counter
cortex_distributor_ingester_queries_total{ingester="1"} 1
cortex_distributor_ingester_queries_total{ingester="ingester-1"} 1
# HELP cortex_distributor_ingester_query_failures_total The total number of failed queries sent to ingesters.
# TYPE cortex_distributor_ingester_query_failures_total counter
cortex_distributor_ingester_query_failures_total{ingester="1"} 1
cortex_distributor_ingester_query_failures_total{ingester="ingester-1"} 1
`), metrics...))
}

Expand Down Expand Up @@ -757,23 +759,20 @@ func TestPush_QuorumError(t *testing.T) {

err := r.KVClient.CAS(context.Background(), ingester.RingKey, func(in interface{}) (interface{}, bool, error) {
r := in.(*ring.Desc)
ingester2 := r.Ingesters["2"]
ingester2 := r.Ingesters["ingester-2"]
ingester2.State = ring.LEFT
ingester2.Timestamp = time.Now().Unix()
r.Ingesters["2"] = ingester2
r.Ingesters["ingester-2"] = ingester2
return in, true, nil
})

require.NoError(t, err)

// Give time to the ring get updated with the KV value
for {
test.Poll(t, 15*time.Second, true, func() interface{} {
replicationSet, _ := r.GetAllHealthy(ring.Read)
if len(replicationSet.Instances) == 2 {
break
}
time.Sleep(100 * time.Millisecond)
}
return len(replicationSet.Instances) == 2
})

for i := 0; i < numberOfWrites; i++ {
request := makeWriteRequest(0, 30, 20, 10)
Expand Down Expand Up @@ -2717,8 +2716,9 @@ func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []
} else {
tokens = []uint32{uint32((math.MaxUint32 / cfg.numIngesters) * i)}
}
addr := fmt.Sprintf("%d", i)
ingesterDescs[addr] = ring.InstanceDesc{
ingester := fmt.Sprintf("ingester-%d", i)
addr := fmt.Sprintf("ip-ingester-%d", i)
ingesterDescs[ingester] = ring.InstanceDesc{
Addr: addr,
Zone: "",
State: ring.ACTIVE,
Expand Down
23 changes: 18 additions & 5 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sort"
"time"

"github.com/go-kit/log/level"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -165,10 +166,16 @@ func (d *Distributor) queryIngestersExemplars(ctx context.Context, replicationSe
return nil, err
}

ingesterId, err := d.ingestersRing.GetInstanceIdByAddr(ing.Addr)
if err != nil {
level.Warn(d.log).Log("msg", "instance not found in the ring", "addr", ing.Addr, "err", err)
}

resp, err := client.(ingester_client.IngesterClient).QueryExemplars(ctx, req)
d.ingesterQueries.WithLabelValues(ing.Addr).Inc()

d.ingesterQueries.WithLabelValues(ingesterId).Inc()
if err != nil {
d.ingesterQueryFailures.WithLabelValues(ing.Addr).Inc()
d.ingesterQueryFailures.WithLabelValues(ingesterId).Inc()
return nil, err
}

Expand Down Expand Up @@ -225,11 +232,17 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
if err != nil {
return nil, err
}
d.ingesterQueries.WithLabelValues(ing.Addr).Inc()

ingesterId, err := d.ingestersRing.GetInstanceIdByAddr(ing.Addr)
if err != nil {
level.Warn(d.log).Log("msg", "instance not found in the ring", "addr", ing.Addr, "err", err)
}

d.ingesterQueries.WithLabelValues(ingesterId).Inc()

stream, err := client.(ingester_client.IngesterClient).QueryStream(ctx, req)
if err != nil {
d.ingesterQueryFailures.WithLabelValues(ing.Addr).Inc()
d.ingesterQueryFailures.WithLabelValues(ingesterId).Inc()
return nil, err
}
defer stream.CloseSend() //nolint:errcheck
Expand All @@ -242,7 +255,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
} else if err != nil {
// Do not track a failure if the context was canceled.
if !grpcutil.IsGRPCContextCanceled(err) {
d.ingesterQueryFailures.WithLabelValues(ing.Addr).Inc()
d.ingesterQueryFailures.WithLabelValues(ingesterId).Inc()
}

return nil, err
Expand Down
10 changes: 10 additions & 0 deletions pkg/ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,16 @@ func (d *Desc) getTokensByZone() map[string][]uint32 {
return MergeTokensByZone(zones)
}

// getInstancesByAddr returns instances id by its address
func (d *Desc) getInstancesByAddr() map[string]string {
instancesByAddMap := make(map[string]string, len(d.Ingesters))
for id, instance := range d.Ingesters {
instancesByAddMap[instance.Addr] = id
}

return instancesByAddMap
}

type CompareResult int

// CompareResult responses
Expand Down
17 changes: 17 additions & 0 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ type ReadRing interface {
// instance does not exist in the ring.
GetInstanceState(instanceID string) (InstanceState, error)

// GetInstanceIdByAddr returns the instance id from its address or an error if the
// // instance does not exist in the ring.
GetInstanceIdByAddr(addr string) (string, error)

// ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes
// all instances that have been part of the identifier's shard since "now - lookbackPeriod".
ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing
Expand Down Expand Up @@ -186,6 +190,8 @@ type Ring struct {
// change it is to create a new one and replace it).
ringInstanceByToken map[uint32]instanceInfo

ringInstanceIdByAddr map[string]string

// When did a set of instances change the last time (instance changing state or heartbeat is ignored for this timestamp).
lastTopologyChange time.Time

Expand Down Expand Up @@ -338,6 +344,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) {
ringTokens := ringDesc.GetTokens()
ringTokensByZone := ringDesc.getTokensByZone()
ringInstanceByToken := ringDesc.getTokensInfo()
ringInstanceByAddr := ringDesc.getInstancesByAddr()
ringZones := getZones(ringTokensByZone)

r.mtx.Lock()
Expand All @@ -346,6 +353,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) {
r.ringTokens = ringTokens
r.ringTokensByZone = ringTokensByZone
r.ringInstanceByToken = ringInstanceByToken
r.ringInstanceIdByAddr = ringInstanceByAddr
r.ringZones = ringZones
r.lastTopologyChange = now
if r.shuffledSubringCache != nil {
Expand Down Expand Up @@ -895,6 +903,15 @@ func (r *Ring) GetInstanceState(instanceID string) (InstanceState, error) {
return instance.GetState(), nil
}

// GetInstanceIdByAddr implements ReadRing.
func (r *Ring) GetInstanceIdByAddr(addr string) (string, error) {
if i, ok := r.ringInstanceIdByAddr[addr]; ok {
return i, nil
}

return "notFound", ErrInstanceNotFound
}

// HasInstance returns whether the ring contains an instance matching the provided instanceID.
func (r *Ring) HasInstance(instanceID string) bool {
r.mtx.RLock()
Expand Down
4 changes: 4 additions & 0 deletions pkg/ring/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ type RingMock struct {
mock.Mock
}

func (r *RingMock) GetInstanceIdByAddr(addr string) (string, error) {
return "", nil
}

func (r *RingMock) Collect(ch chan<- prometheus.Metric) {}

func (r *RingMock) Describe(ch chan<- *prometheus.Desc) {}
Expand Down

0 comments on commit 505cdc2

Please sign in to comment.