diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index b1220e6fb2..eafa0bd816 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -268,7 +268,19 @@ func runReceive( return errors.Wrap(err, "parse limit configuration") } } - limiter, err := receive.NewLimiter(conf.writeLimitsConfig, reg, receiveMode, log.With(logger, "component", "receive-limiter"), conf.limitsConfigReloadTimer) + if conf.maxPendingGrpcWriteRequests > 0 { + level.Info(logger).Log("msg", "set max pending gRPC write request in limiter", "max_pending_requests", conf.maxPendingGrpcWriteRequests) + } + limiter, err := receive.NewLimiterWithOptions( + conf.writeLimitsConfig, + reg, + receiveMode, + log.With(logger, "component", "receive-limiter"), + conf.limitsConfigReloadTimer, + receive.LimiterOptions{ + MaxPendingRequests: int32(conf.maxPendingGrpcWriteRequests), + }, + ) if err != nil { return errors.Wrap(err, "creating limiter") } @@ -408,7 +420,20 @@ func runReceive( store.LazyRetrieval, options..., ) - mts := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, conf.storeRateLimits) + if conf.maxPendingGrpcReadRequests > 0 { + level.Info(logger).Log( + "msg", "set max pending gRPC read request in instrumented store server", + "max_pending_requests", conf.maxPendingGrpcReadRequests, + ) + } + mts := store.NewLimitedStoreServerWithOptions( + store.NewInstrumentedStoreServer(reg, proxy), + reg, + conf.storeRateLimits, + store.LimitsOptions{ + MaxPendingSeriesRequests: int32(conf.maxPendingGrpcReadRequests), + }, + ) rw := store.ReadWriteTSDBStore{ StoreServer: mts, WriteableStoreServer: webHandler, @@ -974,6 +999,8 @@ type receiveConfig struct { topMetricsMinimumCardinality uint64 topMetricsUpdateInterval time.Duration matcherConverterCacheCapacity int + maxPendingGrpcReadRequests int + maxPendingGrpcWriteRequests int featureList *[]string } @@ -1138,6 +1165,10 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { Default("5m").DurationVar(&rc.topMetricsUpdateInterval) cmd.Flag("receive.store-matcher-converter-cache-capacity", "The number of label matchers to cache in the matcher converter for the Store API. Set to 0 to disable to cache. Default is 0."). Default("0").IntVar(&rc.matcherConverterCacheCapacity) + cmd.Flag("receive.max-pending-grcp-read-requests", "Throttle gRPC read requests when this number of requests are pending. Value 0 disables this feature."). + Default("0").IntVar(&rc.maxPendingGrpcReadRequests) + cmd.Flag("receive.max-pending-grcp-write-requests", "Throttle gRPC write requests when this number of requests are pending. Value 0 disables this feature."). + Default("0").IntVar(&rc.maxPendingGrpcWriteRequests) rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+".").Default("").Strings() } diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 26a1bfcbc5..b6944efa44 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -36,7 +36,6 @@ import ( "github.com/prometheus/prometheus/tsdb" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" - "go.uber.org/atomic" "golang.org/x/exp/slices" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -142,9 +141,6 @@ type Handler struct { writeTimeseriesTotal *prometheus.HistogramVec writeE2eLatency *prometheus.HistogramVec - pendingWriteRequests prometheus.Gauge - pendingWriteRequestsCounter atomic.Int32 - Limiter *Limiter } @@ -235,12 +231,6 @@ func NewHandler(logger log.Logger, o *Options) *Handler { Buckets: []float64{1, 5, 10, 20, 30, 40, 50, 60, 90, 120, 300, 600, 900, 1200, 1800, 3600}, }, []string{"code", "tenant", "rollup"}, ), - pendingWriteRequests: promauto.With(registerer).NewGauge( - prometheus.GaugeOpts{ - Name: "thanos_receive_pending_write_requests", - Help: "The number of pending write requests.", - }, - ), } h.forwardRequests.WithLabelValues(labelSuccess) @@ -1083,12 +1073,15 @@ func quorumReached(successes []int, successThreshold int) bool { // RemoteWrite implements the gRPC remote write handler for storepb.WriteableStore. func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*storepb.WriteResponse, error) { + if h.Limiter.ShouldRejectNewRequest() { + return nil, status.Error(codes.ResourceExhausted, "too many pending write requests") + } + // NB: ShouldRejectNewRequest() increments the number of pending requests only when it returns false. + defer h.Limiter.DecrementPendingRequests() + span, ctx := tracing.StartSpan(ctx, "receive_grpc") defer span.Finish() - h.pendingWriteRequests.Set(float64(h.pendingWriteRequestsCounter.Add(1))) - defer h.pendingWriteRequestsCounter.Add(-1) - _, err := h.handleRequest(ctx, uint64(r.Replica), r.Tenant, &prompb.WriteRequest{Timeseries: r.Timeseries}) if err != nil { level.Debug(h.logger).Log("msg", "failed to handle request", "err", err) diff --git a/pkg/receive/limiter.go b/pkg/receive/limiter.go index 46055b3001..35c2f5acca 100644 --- a/pkg/receive/limiter.go +++ b/pkg/receive/limiter.go @@ -18,6 +18,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/gate" + + "go.uber.org/atomic" ) // Limiter is responsible for managing the configuration and initialization of @@ -35,6 +37,19 @@ type Limiter struct { configReloadFailedCounter prometheus.Counter receiverMode ReceiverMode configReloadTimer time.Duration + + // Reject a request if this limit is reached. + // This filed is set at the instance creation and never changes afterwards. + // So it's safe to read it without a lock. + maxPendingRequests int32 + maxPendingRequestLimitHit prometheus.Counter + pendingRequests atomic.Int32 + pendingRequestsGauge prometheus.Gauge +} + +type LimiterOptions struct { + // Value 0 disables the max pending request limiting hehavior. + MaxPendingRequests int32 } // headSeriesLimiter encompasses active/head series limiting logic. @@ -62,16 +77,50 @@ func (l *Limiter) HeadSeriesLimiter() headSeriesLimiter { return l.headSeriesLimiter } +func (l *Limiter) ShouldRejectNewRequest() bool { + // maxPendingRequests doesn't change once set when a limiter instance is created. + // So, it's safe to read it without a lock. + if l.maxPendingRequests > 0 && l.pendingRequests.Load() >= l.maxPendingRequests { + if l.maxPendingRequestLimitHit != nil { + l.maxPendingRequestLimitHit.Inc() + } + return true + } + newValue := l.pendingRequests.Add(1) + if l.pendingRequestsGauge != nil { + l.pendingRequestsGauge.Set(float64(newValue)) + } + return false +} + +func (l *Limiter) DecrementPendingRequests() { + newValue := l.pendingRequests.Add(-1) + if l.pendingRequestsGauge != nil { + l.pendingRequestsGauge.Set(float64(newValue)) + } +} + // NewLimiter creates a new *Limiter given a configuration and prometheus // registerer. func NewLimiter(configFile fileContent, reg prometheus.Registerer, r ReceiverMode, logger log.Logger, configReloadTimer time.Duration) (*Limiter, error) { + return NewLimiterWithOptions(configFile, reg, r, logger, configReloadTimer, LimiterOptions{}) +} + +func NewLimiterWithOptions( + configFile fileContent, + reg prometheus.Registerer, + r ReceiverMode, + logger log.Logger, + configReloadTimer time.Duration, + config LimiterOptions) (*Limiter, error) { limiter := &Limiter{ - writeGate: gate.NewNoop(), - requestLimiter: &noopRequestLimiter{}, - headSeriesLimiter: NewNopSeriesLimit(), - logger: logger, - receiverMode: r, - configReloadTimer: configReloadTimer, + writeGate: gate.NewNoop(), + requestLimiter: &noopRequestLimiter{}, + headSeriesLimiter: NewNopSeriesLimit(), + logger: logger, + receiverMode: r, + configReloadTimer: configReloadTimer, + maxPendingRequests: config.MaxPendingRequests, } if reg != nil { @@ -92,6 +141,26 @@ func NewLimiter(configFile fileContent, reg prometheus.Registerer, r ReceiverMod Help: "How many times the limit configuration failed to reload.", }, ) + limiter.configReloadFailedCounter = promauto.With(limiter.registerer).NewCounter( + prometheus.CounterOpts{ + Namespace: "thanos", + Subsystem: "receive", + Name: "limits_config_reload_err_total", + Help: "How many times the limit configuration failed to reload.", + }, + ) + limiter.maxPendingRequestLimitHit = promauto.With(limiter.registerer).NewCounter( + prometheus.CounterOpts{ + Name: "thanos_receive_max_pending_write_request_limit_hit_total", + Help: "Number of times the max pending write request limit was hit", + }, + ) + limiter.pendingRequestsGauge = promauto.With(limiter.registerer).NewGauge( + prometheus.GaugeOpts{ + Name: "thanos_receive_pending_write_requests", + Help: "Number of pending write requests", + }, + ) } if configFile == nil { diff --git a/pkg/store/limiter.go b/pkg/store/limiter.go index 993330cc85..08b829a02d 100644 --- a/pkg/store/limiter.go +++ b/pkg/store/limiter.go @@ -11,6 +11,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "go.uber.org/atomic" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/thanos-io/thanos/pkg/extkingpin" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -123,10 +125,31 @@ type limitedStoreServer struct { newSeriesLimiter SeriesLimiterFactory newSamplesLimiter ChunksLimiterFactory failedRequestsCounter *prometheus.CounterVec + + // This is a read-only field once it's set. + // Value 0 disables the feature. + maxPendingRequests int32 + pendingRequests atomic.Int32 + maxPendingRequestLimitHit prometheus.Counter + pendingRequestsGauge prometheus.Gauge +} + +type LimitsOptions struct { + // Value 0 disables the feature. + MaxPendingSeriesRequests int32 } // NewLimitedStoreServer creates a new limitedStoreServer. func NewLimitedStoreServer(store storepb.StoreServer, reg prometheus.Registerer, selectLimits SeriesSelectLimits) storepb.StoreServer { + return NewLimitedStoreServerWithOptions(store, reg, selectLimits, LimitsOptions{}) +} + +func NewLimitedStoreServerWithOptions( + store storepb.StoreServer, + reg prometheus.Registerer, + selectLimits SeriesSelectLimits, + opts LimitsOptions, +) storepb.StoreServer { return &limitedStoreServer{ StoreServer: store, newSeriesLimiter: NewSeriesLimiterFactory(selectLimits.SeriesPerRequest), @@ -135,10 +158,25 @@ func NewLimitedStoreServer(store storepb.StoreServer, reg prometheus.Registerer, Name: "thanos_store_selects_dropped_total", Help: "Number of select queries that were dropped due to configured limits.", }, []string{"reason"}), + pendingRequestsGauge: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "thanos_store_server_pending_series_requests", + Help: "Number of pending series requests", + }), + maxPendingRequestLimitHit: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_store_server_hit_max_pending_series_requests_total", + Help: "Number of pending series requests that hit the max pending request limit", + }), + maxPendingRequests: opts.MaxPendingSeriesRequests, } } func (s *limitedStoreServer) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { + if s.maxPendingRequests > 0 && s.pendingRequests.Load() >= s.maxPendingRequests { + return status.Error(codes.ResourceExhausted, "too many pending series requests") + } + s.pendingRequestsGauge.Set(float64(s.pendingRequests.Add(1))) + defer s.pendingRequests.Add(-1) + seriesLimiter := s.newSeriesLimiter(s.failedRequestsCounter.WithLabelValues("series")) chunksLimiter := s.newSamplesLimiter(s.failedRequestsCounter.WithLabelValues("chunks")) limitedSrv := newLimitedServer(srv, seriesLimiter, chunksLimiter) diff --git a/pkg/store/telemetry.go b/pkg/store/telemetry.go index 8d7da11798..f9590826a1 100644 --- a/pkg/store/telemetry.go +++ b/pkg/store/telemetry.go @@ -11,8 +11,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/thanos-io/thanos/pkg/store/storepb" - - "go.uber.org/atomic" ) // seriesStatsAggregator aggregates results from fanned-out queries into a histogram given their @@ -157,12 +155,8 @@ type instrumentedStoreServer struct { storepb.StoreServer seriesRequested prometheus.Histogram chunksRequested prometheus.Histogram - - pendingRequests prometheus.Gauge - pendingRequestsCounter atomic.Int32 } -// NewInstrumentedStoreServer creates a new instrumentedStoreServer. func NewInstrumentedStoreServer(reg prometheus.Registerer, store storepb.StoreServer) storepb.StoreServer { return &instrumentedStoreServer{ StoreServer: store, @@ -176,17 +170,11 @@ func NewInstrumentedStoreServer(reg prometheus.Registerer, store storepb.StoreSe Help: "Number of requested chunks for Series calls", Buckets: []float64{1, 100, 1000, 10000, 100000, 10000000, 100000000, 1000000000}, }), - pendingRequests: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ - Name: "thanos_store_server_pending_series_requests", - Help: "Number of pending series requests", - }), } } func (s *instrumentedStoreServer) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { instrumented := newInstrumentedServer(srv) - s.pendingRequests.Set(float64(s.pendingRequestsCounter.Add(1))) - defer s.pendingRequestsCounter.Add(-1) if err := s.StoreServer.Series(req, instrumented); err != nil { return err