diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index eafa0bd816..a29deea086 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -420,20 +420,8 @@ func runReceive( store.LazyRetrieval, options..., ) - if conf.maxPendingGrpcReadRequests > 0 { - level.Info(logger).Log( - "msg", "set max pending gRPC read request in instrumented store server", - "max_pending_requests", conf.maxPendingGrpcReadRequests, - ) - } - mts := store.NewLimitedStoreServerWithOptions( - store.NewInstrumentedStoreServer(reg, proxy), - reg, - conf.storeRateLimits, - store.LimitsOptions{ - MaxPendingSeriesRequests: int32(conf.maxPendingGrpcReadRequests), - }, - ) + + mts := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, conf.storeRateLimits) rw := store.ReadWriteTSDBStore{ StoreServer: mts, WriteableStoreServer: webHandler, @@ -999,7 +987,6 @@ type receiveConfig struct { topMetricsMinimumCardinality uint64 topMetricsUpdateInterval time.Duration matcherConverterCacheCapacity int - maxPendingGrpcReadRequests int maxPendingGrpcWriteRequests int featureList *[]string @@ -1165,9 +1152,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { Default("5m").DurationVar(&rc.topMetricsUpdateInterval) cmd.Flag("receive.store-matcher-converter-cache-capacity", "The number of label matchers to cache in the matcher converter for the Store API. Set to 0 to disable to cache. Default is 0."). Default("0").IntVar(&rc.matcherConverterCacheCapacity) - cmd.Flag("receive.max-pending-grcp-read-requests", "Throttle gRPC read requests when this number of requests are pending. Value 0 disables this feature."). - Default("0").IntVar(&rc.maxPendingGrpcReadRequests) - cmd.Flag("receive.max-pending-grcp-write-requests", "Throttle gRPC write requests when this number of requests are pending. Value 0 disables this feature."). + cmd.Flag("receive.max-pending-grcp-write-requests", "Reject right away gRPC write requests when this number of requests are pending. Value 0 disables this feature."). Default("0").IntVar(&rc.maxPendingGrpcWriteRequests) rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+".").Default("").Strings() } diff --git a/pkg/receive/limiter.go b/pkg/receive/limiter.go index 35c2f5acca..9777df3be4 100644 --- a/pkg/receive/limiter.go +++ b/pkg/receive/limiter.go @@ -112,7 +112,7 @@ func NewLimiterWithOptions( r ReceiverMode, logger log.Logger, configReloadTimer time.Duration, - config LimiterOptions) (*Limiter, error) { + opts LimiterOptions) (*Limiter, error) { limiter := &Limiter{ writeGate: gate.NewNoop(), requestLimiter: &noopRequestLimiter{}, @@ -120,7 +120,7 @@ func NewLimiterWithOptions( logger: logger, receiverMode: r, configReloadTimer: configReloadTimer, - maxPendingRequests: config.MaxPendingRequests, + maxPendingRequests: opts.MaxPendingRequests, } if reg != nil { diff --git a/pkg/store/limiter.go b/pkg/store/limiter.go index 08b829a02d..ee19c1f480 100644 --- a/pkg/store/limiter.go +++ b/pkg/store/limiter.go @@ -110,11 +110,14 @@ func NewBytesLimiterFactory(limit units.Base2Bytes) BytesLimiterFactory { type SeriesSelectLimits struct { SeriesPerRequest uint64 SamplesPerRequest uint64 + PendingRequests int32 } func (l *SeriesSelectLimits) RegisterFlags(cmd extkingpin.FlagClause) { cmd.Flag("store.limits.request-series", "The maximum series allowed for a single Series request. The Series call fails if this limit is exceeded. 0 means no limit.").Default("0").Uint64Var(&l.SeriesPerRequest) cmd.Flag("store.limits.request-samples", "The maximum samples allowed for a single Series request, The Series call fails if this limit is exceeded. 0 means no limit. NOTE: For efficiency the limit is internally implemented as 'chunks limit' considering each chunk contains a maximum of 120 samples.").Default("0").Uint64Var(&l.SamplesPerRequest) + cmd.Flag("store.limits.pending-requests", "Reject gRPC series requests right away when this number of requests are pending. Value 0 disables this feature."). + Default("0").Int32Var(&l.PendingRequests) } var _ storepb.StoreServer = &limitedStoreServer{} @@ -134,22 +137,8 @@ type limitedStoreServer struct { pendingRequestsGauge prometheus.Gauge } -type LimitsOptions struct { - // Value 0 disables the feature. - MaxPendingSeriesRequests int32 -} - // NewLimitedStoreServer creates a new limitedStoreServer. func NewLimitedStoreServer(store storepb.StoreServer, reg prometheus.Registerer, selectLimits SeriesSelectLimits) storepb.StoreServer { - return NewLimitedStoreServerWithOptions(store, reg, selectLimits, LimitsOptions{}) -} - -func NewLimitedStoreServerWithOptions( - store storepb.StoreServer, - reg prometheus.Registerer, - selectLimits SeriesSelectLimits, - opts LimitsOptions, -) storepb.StoreServer { return &limitedStoreServer{ StoreServer: store, newSeriesLimiter: NewSeriesLimiterFactory(selectLimits.SeriesPerRequest), @@ -166,7 +155,7 @@ func NewLimitedStoreServerWithOptions( Name: "thanos_store_server_hit_max_pending_series_requests_total", Help: "Number of pending series requests that hit the max pending request limit", }), - maxPendingRequests: opts.MaxPendingSeriesRequests, + maxPendingRequests: selectLimits.PendingRequests, } }