From 5438a4619b8927945f4f60d5a703b784397f071b Mon Sep 17 00:00:00 2001 From: "HC Zhu (DB)" Date: Wed, 18 Dec 2024 18:35:23 -0800 Subject: [PATCH] Fix load shedding --- pkg/receive/handler.go | 4 ++-- pkg/receive/limiter.go | 22 ++++++++++++---------- pkg/store/limiter.go | 12 ++++++++++-- 3 files changed, 24 insertions(+), 14 deletions(-) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index b6944efa44..4f8ff9acb9 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -1073,8 +1073,8 @@ func quorumReached(successes []int, successThreshold int) bool { // RemoteWrite implements the gRPC remote write handler for storepb.WriteableStore. func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*storepb.WriteResponse, error) { - if h.Limiter.ShouldRejectNewRequest() { - return nil, status.Error(codes.ResourceExhausted, "too many pending write requests") + if rejected, msg := h.Limiter.ShouldRejectNewRequest(); rejected { + return nil, status.Error(codes.ResourceExhausted, msg) } // NB: ShouldRejectNewRequest() increments the number of pending requests only when it returns false. defer h.Limiter.DecrementPendingRequests() diff --git a/pkg/receive/limiter.go b/pkg/receive/limiter.go index 9777df3be4..1de2ae3942 100644 --- a/pkg/receive/limiter.go +++ b/pkg/receive/limiter.go @@ -77,27 +77,29 @@ func (l *Limiter) HeadSeriesLimiter() headSeriesLimiter { return l.headSeriesLimiter } -func (l *Limiter) ShouldRejectNewRequest() bool { +func (l *Limiter) ShouldRejectNewRequest() (bool, string) { // maxPendingRequests doesn't change once set when a limiter instance is created. // So, it's safe to read it without a lock. - if l.maxPendingRequests > 0 && l.pendingRequests.Load() >= l.maxPendingRequests { - if l.maxPendingRequestLimitHit != nil { - l.maxPendingRequestLimitHit.Inc() + if l.maxPendingRequests > 0 { + if pendingRequests := l.pendingRequests.Load(); pendingRequests >= l.maxPendingRequests { + if l.maxPendingRequestLimitHit != nil { + l.maxPendingRequestLimitHit.Inc() + } + if l.pendingRequestsGauge != nil { + l.pendingRequestsGauge.Set(float64(pendingRequests)) + } + return true, fmt.Sprintf("too many pending write requests: %d >= %d", l.pendingRequests.Load(), l.maxPendingRequests) } - return true } newValue := l.pendingRequests.Add(1) if l.pendingRequestsGauge != nil { l.pendingRequestsGauge.Set(float64(newValue)) } - return false + return false, "" } func (l *Limiter) DecrementPendingRequests() { - newValue := l.pendingRequests.Add(-1) - if l.pendingRequestsGauge != nil { - l.pendingRequestsGauge.Set(float64(newValue)) - } + l.pendingRequests.Add(-1) } // NewLimiter creates a new *Limiter given a configuration and prometheus diff --git a/pkg/store/limiter.go b/pkg/store/limiter.go index ee19c1f480..11a7b7f7b3 100644 --- a/pkg/store/limiter.go +++ b/pkg/store/limiter.go @@ -4,6 +4,7 @@ package store import ( + "fmt" "sync" "github.com/alecthomas/units" @@ -160,8 +161,15 @@ func NewLimitedStoreServer(store storepb.StoreServer, reg prometheus.Registerer, } func (s *limitedStoreServer) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { - if s.maxPendingRequests > 0 && s.pendingRequests.Load() >= s.maxPendingRequests { - return status.Error(codes.ResourceExhausted, "too many pending series requests") + if s.maxPendingRequests > 0 { + if pendingRequests := s.pendingRequests.Load(); pendingRequests >= s.maxPendingRequests { + s.maxPendingRequestLimitHit.Inc() + s.pendingRequestsGauge.Set(float64(pendingRequests)) + return status.Error( + codes.ResourceExhausted, + fmt.Sprintf("too many pending series requests: %d >= %d", s.pendingRequests.Load(), s.maxPendingRequests), + ) + } } s.pendingRequestsGauge.Set(float64(s.pendingRequests.Add(1))) defer s.pendingRequests.Add(-1)