Skip to content

Commit

Permalink
Add max pending request limiting
Browse files Browse the repository at this point in the history
  • Loading branch information
hczhu-db committed Dec 15, 2024
1 parent a26ab5f commit 7336d31
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 27 deletions.
35 changes: 33 additions & 2 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,19 @@ func runReceive(
return errors.Wrap(err, "parse limit configuration")
}
}
limiter, err := receive.NewLimiter(conf.writeLimitsConfig, reg, receiveMode, log.With(logger, "component", "receive-limiter"), conf.limitsConfigReloadTimer)
if conf.maxPendingGrpcWriteRequests > 0 {
level.Info(logger).Log("msg", "set max pending gRPC write request in limiter", "max_pending_requests", conf.maxPendingGrpcWriteRequests)
}
limiter, err := receive.NewLimiterWithOptions(
conf.writeLimitsConfig,
reg,
receiveMode,
log.With(logger, "component", "receive-limiter"),
conf.limitsConfigReloadTimer,
receive.LimiterOptions{
MaxPendingRequests: int32(conf.maxPendingGrpcWriteRequests),
},
)
if err != nil {
return errors.Wrap(err, "creating limiter")
}
Expand Down Expand Up @@ -408,7 +420,20 @@ func runReceive(
store.LazyRetrieval,
options...,
)
mts := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, conf.storeRateLimits)
if conf.maxPendingGrpcReadRequests > 0 {
level.Info(logger).Log(
"msg", "set max pending gRPC read request in instrumented store server",
"max_pending_requests", conf.maxPendingGrpcReadRequests,
)
}
mts := store.NewLimitedStoreServerWithOptions(
store.NewInstrumentedStoreServer(reg, proxy),
reg,
conf.storeRateLimits,
store.LimitsOptions{
MaxPendingSeriesRequests: int32(conf.maxPendingGrpcReadRequests),
},
)
rw := store.ReadWriteTSDBStore{
StoreServer: mts,
WriteableStoreServer: webHandler,
Expand Down Expand Up @@ -974,6 +999,8 @@ type receiveConfig struct {
topMetricsMinimumCardinality uint64
topMetricsUpdateInterval time.Duration
matcherConverterCacheCapacity int
maxPendingGrpcReadRequests int
maxPendingGrpcWriteRequests int

featureList *[]string
}
Expand Down Expand Up @@ -1138,6 +1165,10 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
Default("5m").DurationVar(&rc.topMetricsUpdateInterval)
cmd.Flag("receive.store-matcher-converter-cache-capacity", "The number of label matchers to cache in the matcher converter for the Store API. Set to 0 to disable to cache. Default is 0.").
Default("0").IntVar(&rc.matcherConverterCacheCapacity)
cmd.Flag("receive.max-pending-grcp-read-requests", "Throttle gRPC read requests when this number of requests are pending. Value 0 disables this feature.").
Default("0").IntVar(&rc.maxPendingGrpcReadRequests)
cmd.Flag("receive.max-pending-grcp-write-requests", "Throttle gRPC write requests when this number of requests are pending. Value 0 disables this feature.").
Default("0").IntVar(&rc.maxPendingGrpcWriteRequests)
rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+".").Default("").Strings()
}

Expand Down
19 changes: 6 additions & 13 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/prometheus/prometheus/tsdb"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -142,9 +141,6 @@ type Handler struct {
writeTimeseriesTotal *prometheus.HistogramVec
writeE2eLatency *prometheus.HistogramVec

pendingWriteRequests prometheus.Gauge
pendingWriteRequestsCounter atomic.Int32

Limiter *Limiter
}

Expand Down Expand Up @@ -235,12 +231,6 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
Buckets: []float64{1, 5, 10, 20, 30, 40, 50, 60, 90, 120, 300, 600, 900, 1200, 1800, 3600},
}, []string{"code", "tenant", "rollup"},
),
pendingWriteRequests: promauto.With(registerer).NewGauge(
prometheus.GaugeOpts{
Name: "thanos_receive_pending_write_requests",
Help: "The number of pending write requests.",
},
),
}

h.forwardRequests.WithLabelValues(labelSuccess)
Expand Down Expand Up @@ -1083,12 +1073,15 @@ func quorumReached(successes []int, successThreshold int) bool {

// RemoteWrite implements the gRPC remote write handler for storepb.WriteableStore.
func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*storepb.WriteResponse, error) {
if h.Limiter.ShouldRejectNewRequest() {
return nil, status.Error(codes.ResourceExhausted, "too many pending write requests")
}
// NB: ShouldRejectNewRequest() increments the number of pending requests only when it returns false.
defer h.Limiter.DecrementPendingRequests()

span, ctx := tracing.StartSpan(ctx, "receive_grpc")
defer span.Finish()

h.pendingWriteRequests.Set(float64(h.pendingWriteRequestsCounter.Add(1)))
defer h.pendingWriteRequestsCounter.Add(-1)

_, err := h.handleRequest(ctx, uint64(r.Replica), r.Tenant, &prompb.WriteRequest{Timeseries: r.Timeseries})
if err != nil {
level.Debug(h.logger).Log("msg", "failed to handle request", "err", err)
Expand Down
68 changes: 68 additions & 0 deletions pkg/receive/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -35,6 +36,19 @@ type Limiter struct {
configReloadFailedCounter prometheus.Counter
receiverMode ReceiverMode
configReloadTimer time.Duration

// Reject a request if this limit is reached.
// This filed is set at the instance creation and never changes aferwards.

Check failure on line 41 in pkg/receive/limiter.go

View workflow job for this annotation

GitHub Actions / Check misspelled words

aferwards ==> afterwards
// So it's safe to read it without a lock.
maxPendingRequests int32
maxPendingRequestLimitHit prometheus.Counter
pendingRequests atomic.Int32
pendingRequestsGauge prometheus.Gauge
}

type LimiterOptions struct {
// Value 0 disables the max pending request limiting hehavior.
MaxPendingRequests int32
}

// headSeriesLimiter encompasses active/head series limiting logic.
Expand Down Expand Up @@ -62,16 +76,50 @@ func (l *Limiter) HeadSeriesLimiter() headSeriesLimiter {
return l.headSeriesLimiter
}

func (l *Limiter) ShouldRejectNewRequest() bool {
// maxPendingRequests doesn't change once set when a limiter instance is created.
// So, it's safe to read it without a lock.
if l.maxPendingRequests > 0 && l.pendingRequests.Load() >= l.maxPendingRequests {
if l.maxPendingRequestLimitHit != nil {
l.maxPendingRequestLimitHit.Inc()
}
return true
}
newValue := l.pendingRequests.Add(1)
if l.pendingRequestsGauge != nil {
l.pendingRequestsGauge.Set(float64(newValue))
}
return false
}

func (l *Limiter) DecrementPendingRequests() {
newValue := l.pendingRequests.Add(-1)
if l.pendingRequestsGauge != nil {
l.pendingRequestsGauge.Set(float64(newValue))
}
}

// NewLimiter creates a new *Limiter given a configuration and prometheus
// registerer.
func NewLimiter(configFile fileContent, reg prometheus.Registerer, r ReceiverMode, logger log.Logger, configReloadTimer time.Duration) (*Limiter, error) {
return NewLimiterWithOptions(configFile, reg, r, logger, configReloadTimer, LimiterOptions{})
}

func NewLimiterWithOptions(
configFile fileContent,
reg prometheus.Registerer,
r ReceiverMode,
logger log.Logger,
configReloadTimer time.Duration,
config LimiterOptions) (*Limiter, error) {
limiter := &Limiter{
writeGate: gate.NewNoop(),
requestLimiter: &noopRequestLimiter{},
headSeriesLimiter: NewNopSeriesLimit(),
logger: logger,
receiverMode: r,
configReloadTimer: configReloadTimer,
maxPendingRequests: config.MaxPendingRequests,
}

if reg != nil {
Expand All @@ -92,6 +140,26 @@ func NewLimiter(configFile fileContent, reg prometheus.Registerer, r ReceiverMod
Help: "How many times the limit configuration failed to reload.",
},
)
limiter.configReloadFailedCounter = promauto.With(limiter.registerer).NewCounter(
prometheus.CounterOpts{
Namespace: "thanos",
Subsystem: "receive",
Name: "limits_config_reload_err_total",
Help: "How many times the limit configuration failed to reload.",
},
)
limiter.maxPendingRequestLimitHit = promauto.With(limiter.registerer).NewCounter(
prometheus.CounterOpts{
Name: "thanos_receive_max_pending_write_request_limit_hit_total",
Help: "Number of times the max pending write request limit was hit",
},
)
limiter.pendingRequestsGauge = promauto.With(limiter.registerer).NewGauge(
prometheus.GaugeOpts{
Name: "thanos_receive_pending_write_requests",
Help: "Number of pending write requests",
},
)
}

if configFile == nil {
Expand Down
38 changes: 38 additions & 0 deletions pkg/store/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/atomic"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/thanos-io/thanos/pkg/extkingpin"
"github.com/thanos-io/thanos/pkg/store/storepb"
Expand Down Expand Up @@ -123,10 +125,31 @@ type limitedStoreServer struct {
newSeriesLimiter SeriesLimiterFactory
newSamplesLimiter ChunksLimiterFactory
failedRequestsCounter *prometheus.CounterVec

// This is a read-only field once it's set.
// Value 0 disalbes the feature.

Check failure on line 130 in pkg/store/limiter.go

View workflow job for this annotation

GitHub Actions / Check misspelled words

disalbes ==> disables
maxPendingRequests int32
pendingRequests atomic.Int32
maxPendingRequestLimitHit prometheus.Counter
pendingRequestsGauge prometheus.Gauge
}

type LimitsOptions struct {
// Value 0 disables the feature.
MaxPendingSeriesRequests int32
}

// NewLimitedStoreServer creates a new limitedStoreServer.
func NewLimitedStoreServer(store storepb.StoreServer, reg prometheus.Registerer, selectLimits SeriesSelectLimits) storepb.StoreServer {
return NewLimitedStoreServerWithOptions(store, reg, selectLimits, LimitsOptions{})
}

func NewLimitedStoreServerWithOptions(
store storepb.StoreServer,
reg prometheus.Registerer,
selectLimits SeriesSelectLimits,
opts LimitsOptions,
) storepb.StoreServer {
return &limitedStoreServer{
StoreServer: store,
newSeriesLimiter: NewSeriesLimiterFactory(selectLimits.SeriesPerRequest),
Expand All @@ -135,10 +158,25 @@ func NewLimitedStoreServer(store storepb.StoreServer, reg prometheus.Registerer,
Name: "thanos_store_selects_dropped_total",
Help: "Number of select queries that were dropped due to configured limits.",
}, []string{"reason"}),
pendingRequestsGauge: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_store_server_pending_series_requests",
Help: "Number of pending series requests",
}),
maxPendingRequestLimitHit: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_store_server_hit_max_pending_series_request_limit",
Help: "Number of pending series requests that hit the max pending request limit",
}),
maxPendingRequests: opts.MaxPendingSeriesRequests,
}
}

func (s *limitedStoreServer) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
if s.maxPendingRequests > 0 && s.pendingRequests.Load() >= s.maxPendingRequests {
return status.Error(codes.ResourceExhausted, "too many pending series requests")
}
s.pendingRequestsGauge.Set(float64(s.pendingRequests.Add(1)))
defer s.pendingRequests.Add(-1)

seriesLimiter := s.newSeriesLimiter(s.failedRequestsCounter.WithLabelValues("series"))
chunksLimiter := s.newSamplesLimiter(s.failedRequestsCounter.WithLabelValues("chunks"))
limitedSrv := newLimitedServer(srv, seriesLimiter, chunksLimiter)
Expand Down
12 changes: 0 additions & 12 deletions pkg/store/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/thanos-io/thanos/pkg/store/storepb"

"go.uber.org/atomic"
)

// seriesStatsAggregator aggregates results from fanned-out queries into a histogram given their
Expand Down Expand Up @@ -157,12 +155,8 @@ type instrumentedStoreServer struct {
storepb.StoreServer
seriesRequested prometheus.Histogram
chunksRequested prometheus.Histogram

pendingRequests prometheus.Gauge
pendingRequestsCounter atomic.Int32
}

// NewInstrumentedStoreServer creates a new instrumentedStoreServer.
func NewInstrumentedStoreServer(reg prometheus.Registerer, store storepb.StoreServer) storepb.StoreServer {
return &instrumentedStoreServer{
StoreServer: store,
Expand All @@ -176,17 +170,11 @@ func NewInstrumentedStoreServer(reg prometheus.Registerer, store storepb.StoreSe
Help: "Number of requested chunks for Series calls",
Buckets: []float64{1, 100, 1000, 10000, 100000, 10000000, 100000000, 1000000000},
}),
pendingRequests: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_store_server_pending_series_requests",
Help: "Number of pending series requests",
}),
}
}

func (s *instrumentedStoreServer) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
instrumented := newInstrumentedServer(srv)
s.pendingRequests.Set(float64(s.pendingRequestsCounter.Add(1)))
defer s.pendingRequestsCounter.Add(-1)

if err := s.StoreServer.Series(req, instrumented); err != nil {
return err
Expand Down

0 comments on commit 7336d31

Please sign in to comment.