Skip to content

Commit

Permalink
Move store max_pending_requests to limits config
Browse files Browse the repository at this point in the history
  • Loading branch information
hczhu-db committed Dec 17, 2024
1 parent 2651ebf commit cb958ba
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 35 deletions.
21 changes: 3 additions & 18 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,20 +420,8 @@ func runReceive(
store.LazyRetrieval,
options...,
)
if conf.maxPendingGrpcReadRequests > 0 {
level.Info(logger).Log(
"msg", "set max pending gRPC read request in instrumented store server",
"max_pending_requests", conf.maxPendingGrpcReadRequests,
)
}
mts := store.NewLimitedStoreServerWithOptions(
store.NewInstrumentedStoreServer(reg, proxy),
reg,
conf.storeRateLimits,
store.LimitsOptions{
MaxPendingSeriesRequests: int32(conf.maxPendingGrpcReadRequests),
},
)

mts := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, conf.storeRateLimits)
rw := store.ReadWriteTSDBStore{
StoreServer: mts,
WriteableStoreServer: webHandler,
Expand Down Expand Up @@ -999,7 +987,6 @@ type receiveConfig struct {
topMetricsMinimumCardinality uint64
topMetricsUpdateInterval time.Duration
matcherConverterCacheCapacity int
maxPendingGrpcReadRequests int
maxPendingGrpcWriteRequests int

featureList *[]string
Expand Down Expand Up @@ -1165,9 +1152,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
Default("5m").DurationVar(&rc.topMetricsUpdateInterval)
cmd.Flag("receive.store-matcher-converter-cache-capacity", "The number of label matchers to cache in the matcher converter for the Store API. Set to 0 to disable to cache. Default is 0.").
Default("0").IntVar(&rc.matcherConverterCacheCapacity)
cmd.Flag("receive.max-pending-grcp-read-requests", "Throttle gRPC read requests when this number of requests are pending. Value 0 disables this feature.").
Default("0").IntVar(&rc.maxPendingGrpcReadRequests)
cmd.Flag("receive.max-pending-grcp-write-requests", "Throttle gRPC write requests when this number of requests are pending. Value 0 disables this feature.").
cmd.Flag("receive.max-pending-grcp-write-requests", "Reject right away gRPC write requests when this number of requests are pending. Value 0 disables this feature.").
Default("0").IntVar(&rc.maxPendingGrpcWriteRequests)
rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+".").Default("").Strings()
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/receive/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,15 @@ func NewLimiterWithOptions(
r ReceiverMode,
logger log.Logger,
configReloadTimer time.Duration,
config LimiterOptions) (*Limiter, error) {
opts LimiterOptions) (*Limiter, error) {
limiter := &Limiter{
writeGate: gate.NewNoop(),
requestLimiter: &noopRequestLimiter{},
headSeriesLimiter: NewNopSeriesLimit(),
logger: logger,
receiverMode: r,
configReloadTimer: configReloadTimer,
maxPendingRequests: config.MaxPendingRequests,
maxPendingRequests: opts.MaxPendingRequests,
}

if reg != nil {
Expand Down
19 changes: 4 additions & 15 deletions pkg/store/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,14 @@ func NewBytesLimiterFactory(limit units.Base2Bytes) BytesLimiterFactory {
type SeriesSelectLimits struct {
SeriesPerRequest uint64
SamplesPerRequest uint64
PendingRequests int32
}

func (l *SeriesSelectLimits) RegisterFlags(cmd extkingpin.FlagClause) {
cmd.Flag("store.limits.request-series", "The maximum series allowed for a single Series request. The Series call fails if this limit is exceeded. 0 means no limit.").Default("0").Uint64Var(&l.SeriesPerRequest)
cmd.Flag("store.limits.request-samples", "The maximum samples allowed for a single Series request, The Series call fails if this limit is exceeded. 0 means no limit. NOTE: For efficiency the limit is internally implemented as 'chunks limit' considering each chunk contains a maximum of 120 samples.").Default("0").Uint64Var(&l.SamplesPerRequest)
cmd.Flag("store.limits.pending-requests", "Reject gRPC series requests right away when this number of requests are pending. Value 0 disables this feature.").
Default("0").Int32Var(&l.PendingRequests)
}

var _ storepb.StoreServer = &limitedStoreServer{}
Expand All @@ -134,22 +137,8 @@ type limitedStoreServer struct {
pendingRequestsGauge prometheus.Gauge
}

type LimitsOptions struct {
// Value 0 disables the feature.
MaxPendingSeriesRequests int32
}

// NewLimitedStoreServer creates a new limitedStoreServer.
func NewLimitedStoreServer(store storepb.StoreServer, reg prometheus.Registerer, selectLimits SeriesSelectLimits) storepb.StoreServer {
return NewLimitedStoreServerWithOptions(store, reg, selectLimits, LimitsOptions{})
}

func NewLimitedStoreServerWithOptions(
store storepb.StoreServer,
reg prometheus.Registerer,
selectLimits SeriesSelectLimits,
opts LimitsOptions,
) storepb.StoreServer {
return &limitedStoreServer{
StoreServer: store,
newSeriesLimiter: NewSeriesLimiterFactory(selectLimits.SeriesPerRequest),
Expand All @@ -166,7 +155,7 @@ func NewLimitedStoreServerWithOptions(
Name: "thanos_store_server_hit_max_pending_series_requests_total",
Help: "Number of pending series requests that hit the max pending request limit",
}),
maxPendingRequests: opts.MaxPendingSeriesRequests,
maxPendingRequests: selectLimits.PendingRequests,
}
}

Expand Down

0 comments on commit cb958ba

Please sign in to comment.