Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Polish Thanos Receive load shedding #118

Merged
merged 1 commit into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1073,8 +1073,8 @@ func quorumReached(successes []int, successThreshold int) bool {

// RemoteWrite implements the gRPC remote write handler for storepb.WriteableStore.
func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*storepb.WriteResponse, error) {
if h.Limiter.ShouldRejectNewRequest() {
return nil, status.Error(codes.ResourceExhausted, "too many pending write requests")
if rejected, msg := h.Limiter.ShouldRejectNewRequest(); rejected {
return nil, status.Error(codes.ResourceExhausted, msg)
}
// NB: ShouldRejectNewRequest() increments the number of pending requests only when it returns false.
defer h.Limiter.DecrementPendingRequests()
Expand Down
22 changes: 12 additions & 10 deletions pkg/receive/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,27 +77,29 @@ func (l *Limiter) HeadSeriesLimiter() headSeriesLimiter {
return l.headSeriesLimiter
}

func (l *Limiter) ShouldRejectNewRequest() bool {
func (l *Limiter) ShouldRejectNewRequest() (bool, string) {
// maxPendingRequests doesn't change once set when a limiter instance is created.
// So, it's safe to read it without a lock.
if l.maxPendingRequests > 0 && l.pendingRequests.Load() >= l.maxPendingRequests {
if l.maxPendingRequestLimitHit != nil {
l.maxPendingRequestLimitHit.Inc()
if l.maxPendingRequests > 0 {
if pendingRequests := l.pendingRequests.Load(); pendingRequests >= l.maxPendingRequests {
if l.maxPendingRequestLimitHit != nil {
l.maxPendingRequestLimitHit.Inc()
}
if l.pendingRequestsGauge != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think tracking pending request should always on, check nil seems unnecessary and hurts the code readability

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's how existing instrumentation works. I believe there is a code path such that those metrics are not initiated.
image

image

l.pendingRequestsGauge.Set(float64(pendingRequests))
}
return true, fmt.Sprintf("too many pending write requests: %d >= %d", l.pendingRequests.Load(), l.maxPendingRequests)
}
return true
}
newValue := l.pendingRequests.Add(1)
if l.pendingRequestsGauge != nil {
l.pendingRequestsGauge.Set(float64(newValue))
}
return false
return false, ""
}

func (l *Limiter) DecrementPendingRequests() {
newValue := l.pendingRequests.Add(-1)
if l.pendingRequestsGauge != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this would be nil?

l.pendingRequestsGauge.Set(float64(newValue))
}
l.pendingRequests.Add(-1)
}

// NewLimiter creates a new *Limiter given a configuration and prometheus
Expand Down
12 changes: 10 additions & 2 deletions pkg/store/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package store

import (
"fmt"
"sync"

"github.com/alecthomas/units"
Expand Down Expand Up @@ -160,8 +161,15 @@ func NewLimitedStoreServer(store storepb.StoreServer, reg prometheus.Registerer,
}

func (s *limitedStoreServer) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
if s.maxPendingRequests > 0 && s.pendingRequests.Load() >= s.maxPendingRequests {
return status.Error(codes.ResourceExhausted, "too many pending series requests")
if s.maxPendingRequests > 0 {
if pendingRequests := s.pendingRequests.Load(); pendingRequests >= s.maxPendingRequests {
s.maxPendingRequestLimitHit.Inc()
s.pendingRequestsGauge.Set(float64(pendingRequests))
return status.Error(
codes.ResourceExhausted,
fmt.Sprintf("too many pending series requests: %d >= %d", s.pendingRequests.Load(), s.maxPendingRequests),
)
}
}
s.pendingRequestsGauge.Set(float64(s.pendingRequests.Add(1)))
defer s.pendingRequests.Add(-1)
Expand Down
Loading