From f2673beadc3b08b62bc98c8905ff41fc86239bf9 Mon Sep 17 00:00:00 2001 From: "HC Zhu (DB)" Date: Wed, 18 Dec 2024 18:35:23 -0800 Subject: [PATCH] Fix load shedding --- pkg/receive/handler.go | 4 ++-- pkg/receive/limiter.go | 9 ++++++--- pkg/store/limiter.go | 8 +++++++- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index b6944efa44..4f8ff9acb9 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -1073,8 +1073,8 @@ func quorumReached(successes []int, successThreshold int) bool { // RemoteWrite implements the gRPC remote write handler for storepb.WriteableStore. func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*storepb.WriteResponse, error) { - if h.Limiter.ShouldRejectNewRequest() { - return nil, status.Error(codes.ResourceExhausted, "too many pending write requests") + if rejected, msg := h.Limiter.ShouldRejectNewRequest(); rejected { + return nil, status.Error(codes.ResourceExhausted, msg) } // NB: ShouldRejectNewRequest() increments the number of pending requests only when it returns false. defer h.Limiter.DecrementPendingRequests() diff --git a/pkg/receive/limiter.go b/pkg/receive/limiter.go index 9777df3be4..290e95fff4 100644 --- a/pkg/receive/limiter.go +++ b/pkg/receive/limiter.go @@ -77,20 +77,23 @@ func (l *Limiter) HeadSeriesLimiter() headSeriesLimiter { return l.headSeriesLimiter } -func (l *Limiter) ShouldRejectNewRequest() bool { +func (l *Limiter) ShouldRejectNewRequest() (bool, string) { // maxPendingRequests doesn't change once set when a limiter instance is created. // So, it's safe to read it without a lock. if l.maxPendingRequests > 0 && l.pendingRequests.Load() >= l.maxPendingRequests { if l.maxPendingRequestLimitHit != nil { l.maxPendingRequestLimitHit.Inc() } - return true + if l.pendingRequestsGauge != nil { + l.pendingRequestsGauge.Set(float64(l.pendingRequests.Load())) + } + return true, fmt.Sprintf("too many pending write requests: %d >= %d", l.pendingRequests.Load(), l.maxPendingRequests) } newValue := l.pendingRequests.Add(1) if l.pendingRequestsGauge != nil { l.pendingRequestsGauge.Set(float64(newValue)) } - return false + return false, "" } func (l *Limiter) DecrementPendingRequests() { diff --git a/pkg/store/limiter.go b/pkg/store/limiter.go index ee19c1f480..ad0fd4cc18 100644 --- a/pkg/store/limiter.go +++ b/pkg/store/limiter.go @@ -4,6 +4,7 @@ package store import ( + "fmt" "sync" "github.com/alecthomas/units" @@ -161,7 +162,12 @@ func NewLimitedStoreServer(store storepb.StoreServer, reg prometheus.Registerer, func (s *limitedStoreServer) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { if s.maxPendingRequests > 0 && s.pendingRequests.Load() >= s.maxPendingRequests { - return status.Error(codes.ResourceExhausted, "too many pending series requests") + s.maxPendingRequestLimitHit.Inc() + s.pendingRequestsGauge.Set(float64(s.pendingRequests.Load())) + return status.Error( + codes.ResourceExhausted, + fmt.Sprintf("too many pending series requests: %d >= %d", s.pendingRequests.Load(), s.maxPendingRequests), + ) } s.pendingRequestsGauge.Set(float64(s.pendingRequests.Add(1))) defer s.pendingRequests.Add(-1)