diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 62308285fd..26a1bfcbc5 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -36,6 +36,7 @@ import ( "github.com/prometheus/prometheus/tsdb" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + "go.uber.org/atomic" "golang.org/x/exp/slices" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -141,6 +142,9 @@ type Handler struct { writeTimeseriesTotal *prometheus.HistogramVec writeE2eLatency *prometheus.HistogramVec + pendingWriteRequests prometheus.Gauge + pendingWriteRequestsCounter atomic.Int32 + Limiter *Limiter } @@ -231,6 +235,12 @@ func NewHandler(logger log.Logger, o *Options) *Handler { Buckets: []float64{1, 5, 10, 20, 30, 40, 50, 60, 90, 120, 300, 600, 900, 1200, 1800, 3600}, }, []string{"code", "tenant", "rollup"}, ), + pendingWriteRequests: promauto.With(registerer).NewGauge( + prometheus.GaugeOpts{ + Name: "thanos_receive_pending_write_requests", + Help: "The number of pending write requests.", + }, + ), } h.forwardRequests.WithLabelValues(labelSuccess) @@ -1076,6 +1086,9 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st span, ctx := tracing.StartSpan(ctx, "receive_grpc") defer span.Finish() + h.pendingWriteRequests.Set(float64(h.pendingWriteRequestsCounter.Add(1))) + defer h.pendingWriteRequestsCounter.Add(-1) + _, err := h.handleRequest(ctx, uint64(r.Replica), r.Tenant, &prompb.WriteRequest{Timeseries: r.Timeseries}) if err != nil { level.Debug(h.logger).Log("msg", "failed to handle request", "err", err) diff --git a/pkg/store/telemetry.go b/pkg/store/telemetry.go index 135daf85a0..8d7da11798 100644 --- a/pkg/store/telemetry.go +++ b/pkg/store/telemetry.go @@ -11,6 +11,8 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/thanos-io/thanos/pkg/store/storepb" + + "go.uber.org/atomic" ) // seriesStatsAggregator aggregates results from fanned-out queries into a histogram given their @@ -155,6 +157,9 @@ type instrumentedStoreServer struct { storepb.StoreServer seriesRequested prometheus.Histogram chunksRequested prometheus.Histogram + + pendingRequests prometheus.Gauge + pendingRequestsCounter atomic.Int32 } // NewInstrumentedStoreServer creates a new instrumentedStoreServer. @@ -171,11 +176,18 @@ func NewInstrumentedStoreServer(reg prometheus.Registerer, store storepb.StoreSe Help: "Number of requested chunks for Series calls", Buckets: []float64{1, 100, 1000, 10000, 100000, 10000000, 100000000, 1000000000}, }), + pendingRequests: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "thanos_store_server_pending_series_requests", + Help: "Number of pending series requests", + }), } } func (s *instrumentedStoreServer) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { instrumented := newInstrumentedServer(srv) + s.pendingRequests.Set(float64(s.pendingRequestsCounter.Add(1))) + defer s.pendingRequestsCounter.Add(-1) + if err := s.StoreServer.Series(req, instrumented); err != nil { return err }