Skip to content

Commit

Permalink
Cleanup stats and error cases
Browse files Browse the repository at this point in the history
  • Loading branch information
beautifulentropy committed Sep 27, 2023
1 parent 9fe6ca5 commit 30de5d7
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 99 deletions.
51 changes: 25 additions & 26 deletions cmd/boulder-wfe2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func loadChain(certFiles []string) (*issuance.Certificate, []byte, error) {
return certs[0], buf.Bytes(), nil
}

func setupWFE(c Config, scope prometheus.Registerer, clk clock.Clock, log blog.Logger) (rapb.RegistrationAuthorityClient, sapb.StorageAuthorityReadOnlyClient, nonce.Getter, map[string]nonce.Redeemer, nonce.Redeemer, string, *ratelimits.Limiter, *bredis.Lookup) {
func setupWFE(c Config, scope prometheus.Registerer, clk clock.Clock) (rapb.RegistrationAuthorityClient, sapb.StorageAuthorityReadOnlyClient, nonce.Getter, map[string]nonce.Redeemer, nonce.Redeemer, string) {
tlsConfig, err := c.WFE.TLS.Load(scope)
cmd.FailOnError(err, "TLS config")

Expand Down Expand Up @@ -255,26 +255,7 @@ func setupWFE(c Config, scope prometheus.Registerer, clk clock.Clock, log blog.L
}
}

var limiter *ratelimits.Limiter
var limiterLookup *bredis.Lookup
if c.WFE.Limiter.Defaults != "" {
// Setup rate limiting.
var ring *redis.Ring
if len(c.WFE.Limiter.Redis.Lookups) > 0 {
// Configure a Redis client with periodic SRV lookups.
ring, limiterLookup, err = c.WFE.Limiter.Redis.NewRingWithPeriodicLookups(scope, log)
cmd.FailOnError(err, "Failed to create Redis SRV Lookup for rate limiting")
} else {
// Configure a Redis client with static shard addresses.
ring, err = c.WFE.Limiter.Redis.NewRing(scope)
cmd.FailOnError(err, "Failed to create Redis client for rate limiting")
}
source := ratelimits.NewRedisSource(ring, clk, scope)
limiter, err = ratelimits.NewLimiter(clk, source, c.WFE.Limiter.Defaults, c.WFE.Limiter.Overrides, scope)
cmd.FailOnError(err, "Failed to create rate limiter")
}

return rac, sac, gnc, npm, rnc, rncKey, limiter, limiterLookup
return rac, sac, gnc, npm, rnc, rncKey
}

type errorWriter struct {
Expand Down Expand Up @@ -332,7 +313,7 @@ func main() {

clk := cmd.Clock()

rac, sac, gnc, npm, rnc, npKey, limiter, limiterLookup := setupWFE(c, stats, clk, logger)
rac, sac, gnc, npm, rnc, npKey := setupWFE(c, stats, clk)

kp, err := sagoodkey.NewKeyPolicy(&c.WFE.GoodKey, sac.KeyBlocked)
cmd.FailOnError(err, "Unable to create key policy")
Expand All @@ -359,6 +340,25 @@ func main() {
}
pendingAuthorizationLifetime := time.Duration(c.WFE.PendingAuthorizationLifetimeDays) * 24 * time.Hour

var limiter *ratelimits.Limiter
var limiterLookup *bredis.Lookup
if c.WFE.Limiter.Defaults != "" {
// Setup rate limiting.
var ring *redis.Ring
if len(c.WFE.Limiter.Redis.Lookups) > 0 {
// Configure a Redis client with periodic SRV lookups.
ring, limiterLookup, err = c.WFE.Limiter.Redis.NewRingWithPeriodicLookups(stats, logger)
cmd.FailOnError(err, "Failed to create Redis SRV Lookup for rate limiting")
} else {
// Configure a Redis client with static shard addresses.
ring, err = c.WFE.Limiter.Redis.NewRing(stats)
cmd.FailOnError(err, "Failed to create Redis client for rate limiting")
}
source := ratelimits.NewRedisSource(ring, clk, stats)
limiter, err = ratelimits.NewLimiter(clk, source, c.WFE.Limiter.Defaults, c.WFE.Limiter.Overrides, stats)
cmd.FailOnError(err, "Failed to create rate limiter")
}

var accountGetter wfe2.AccountGetter
if c.WFE.AccountCache != nil {
accountGetter = wfe2.NewAccountCache(sac,
Expand Down Expand Up @@ -392,7 +392,7 @@ func main() {
cmd.FailOnError(err, "Unable to create WFE")

var limiterCtx context.Context
var shutdownLimiterLookup context.CancelFunc
var shutdownLimiterLookup context.CancelFunc = func() {}
if limiterLookup != nil {
limiterCtx, shutdownLimiterLookup = context.WithCancel(context.Background())
limiterLookup.Start(limiterCtx)
Expand Down Expand Up @@ -447,13 +447,12 @@ func main() {
// ListenAndServe() and ListenAndServeTLS() to immediately return, then waits
// for any lingering connection-handling goroutines to finish their work.
defer func() {
if limiterLookup != nil {
defer shutdownLimiterLookup()
}

ctx, cancel := context.WithTimeout(context.Background(), c.WFE.ShutdownStopTimeout.Duration)
defer cancel()
_ = srv.Shutdown(ctx)
_ = tlsSrv.Shutdown(ctx)
shutdownLimiterLookup()
oTelShutdown(ctx)
}()

Expand Down
2 changes: 1 addition & 1 deletion ratelimits/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func loadAndParseOverrideLimits(path string) (limits, error) {
err = validateIdForName(name, id)
if err != nil {
return nil, fmt.Errorf(
"validating name %s and id %q for override limit %q: %w", name.String(), id, k, err)
"validating name %s and id %q for override limit %q: %w", name, id, k, err)
}
if name == CertificatesPerFQDNSetPerAccount {
// FQDNSet hashes are not a nice thing to ask for in a config file,
Expand Down
31 changes: 20 additions & 11 deletions ratelimits/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ const (
Denied = "denied"
)

// ErrLimitDisabled indicates that the limit name specified is valid but is not
// currently configured.
var ErrLimitDisabled = errors.New("limit disabled")

// ErrInvalidCost indicates that the cost specified was <= 0.
var ErrInvalidCost = fmt.Errorf("invalid cost, must be > 0")

Expand All @@ -33,9 +29,13 @@ var ErrInvalidCostForCheck = fmt.Errorf("invalid check cost, must be >= 0")
// ErrInvalidCostOverLimit indicates that the cost specified was > limit.Burst.
var ErrInvalidCostOverLimit = fmt.Errorf("invalid cost, must be <= limit.Burst")

// ErrBucketAlreadyFull indicates that the bucket is already at maximum
// capacity. A refund is not possible.
var ErrBucketAlreadyFull = fmt.Errorf("bucket already full")
// errLimitDisabled indicates that the limit name specified is valid but is not
// currently configured.
var errLimitDisabled = errors.New("limit disabled")

// disabledLimitDecision is an "allowed" *Decision that should be returned when
// a checked limit is found to be disabled.
var disabledLimitDecision = &Decision{true, 0, 0, 0, time.Time{}}

// Limiter provides a high-level interface for rate limiting requests by
// utilizing a leaky bucket-style approach.
Expand Down Expand Up @@ -136,6 +136,9 @@ func (l *Limiter) Check(ctx context.Context, name Name, id string, cost int64) (

limit, err := l.getLimit(name, id)
if err != nil {
if errors.Is(err, errLimitDisabled) {
return disabledLimitDecision, nil
}
return nil, err
}

Expand Down Expand Up @@ -176,6 +179,9 @@ func (l *Limiter) Spend(ctx context.Context, name Name, id string, cost int64) (

limit, err := l.getLimit(name, id)
if err != nil {
if errors.Is(err, errLimitDisabled) {
return disabledLimitDecision, nil
}
return nil, err
}

Expand Down Expand Up @@ -236,15 +242,17 @@ func (l *Limiter) Spend(ctx context.Context, name Name, id string, cost int64) (
// capacity. However, partial refunds are allowed and are considered successful.
// For instance, if a bucket has a maximum capacity of 10 and currently has 5
// requests remaining, a refund request of 7 will result in the bucket reaching
// its maximum capacity of 10, not 12. If the specified limit is disabled,
// ErrLimitDisabled is returned.
// its maximum capacity of 10, not 12.
func (l *Limiter) Refund(ctx context.Context, name Name, id string, cost int64) (*Decision, error) {
if cost <= 0 {
return nil, ErrInvalidCost
}

limit, err := l.getLimit(name, id)
if err != nil {
if errors.Is(err, errLimitDisabled) {
return disabledLimitDecision, nil
}
return nil, err
}

Expand All @@ -254,7 +262,8 @@ func (l *Limiter) Refund(ctx context.Context, name Name, id string, cost int64)
}
d := maybeRefund(l.clk, limit, tat, cost)
if !d.Allowed {
return d, ErrBucketAlreadyFull
// The bucket is already at maximum capacity.
return d, nil
}
return d, l.source.Set(ctx, bucketKey(name, id), d.newTAT)

Expand Down Expand Up @@ -298,5 +307,5 @@ func (l *Limiter) getLimit(name Name, id string) (limit, error) {
if ok {
return dl, nil
}
return limit{}, ErrLimitDisabled
return limit{}, errLimitDisabled
}
3 changes: 2 additions & 1 deletion ratelimits/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,8 @@ func Test_Limiter_RefundAndReset(t *testing.T) {

// Refund 1 requests above our limit, this should fail.
d, err = l.Refund(testCtx, NewRegistrationsPerIPAddress, testIP, 1)
test.AssertErrorIs(t, err, ErrBucketAlreadyFull)
test.AssertNotError(t, err, "should not error")
test.Assert(t, !d.Allowed, "should not be allowed")
test.AssertEquals(t, d.Remaining, int64(20))
})
}
Expand Down
70 changes: 24 additions & 46 deletions ratelimits/source_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,55 +16,29 @@ var _ source = (*RedisSource)(nil)

// RedisSource is a ratelimits source backed by sharded Redis.
type RedisSource struct {
client *redis.Ring
clk clock.Clock
setLatency *prometheus.HistogramVec
getLatency *prometheus.HistogramVec
deleteLatency *prometheus.HistogramVec
client *redis.Ring
clk clock.Clock
latency *prometheus.HistogramVec
}

// NewRedisSource returns a new Redis backed source using the provided
// *redis.Ring client.
func NewRedisSource(client *redis.Ring, clk clock.Clock, stats prometheus.Registerer) *RedisSource {
// Exponential buckets ranging from 0.0005s to 3s.
buckets := prometheus.ExponentialBucketsRange(0.0005, 3, 8)

setLatency := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "ratelimits_set_latency",
Help: "Histogram of RedisSource.Set() call latencies labeled by result",
Buckets: buckets,
},
[]string{"result"},
)
stats.MustRegister(setLatency)

getLatency := prometheus.NewHistogramVec(
latency := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "ratelimits_get_latency",
Help: "Histogram of RedisSource.Get() call latencies labeled by result",
Buckets: buckets,
Name: "ratelimits_latency",
Help: "Histogram of Redis call latencies labeled by call=[set|get|delete|ping] and result=[success|error]",
// Exponential buckets ranging from 0.0005s to 3s.
Buckets: prometheus.ExponentialBucketsRange(0.0005, 3, 8),
},
[]string{"result"},
[]string{"call", "result"},
)
stats.MustRegister(getLatency)

deleteLatency := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "ratelimits_delete_latency",
Help: "Histogram of RedisSource.Delete() call latencies labeled by result",
Buckets: buckets,
},
[]string{"result"},
)
stats.MustRegister(deleteLatency)
stats.MustRegister(latency)

return &RedisSource{
client: client,
clk: clk,
setLatency: setLatency,
getLatency: getLatency,
deleteLatency: deleteLatency,
client: client,
clk: clk,
latency: latency,
}
}

Expand Down Expand Up @@ -102,11 +76,11 @@ func (r *RedisSource) Set(ctx context.Context, bucketKey string, tat time.Time)

err := r.client.Set(ctx, bucketKey, tat.UnixNano(), 0).Err()
if err != nil {
r.setLatency.With(prometheus.Labels{"result": resultForError(err)}).Observe(time.Since(start).Seconds())
r.latency.With(prometheus.Labels{"call": "set", "result": resultForError(err)}).Observe(time.Since(start).Seconds())
return err
}

r.setLatency.With(prometheus.Labels{"result": "success"}).Observe(time.Since(start).Seconds())
r.latency.With(prometheus.Labels{"call": "set", "result": "success"}).Observe(time.Since(start).Seconds())
return nil
}

Expand All @@ -120,14 +94,14 @@ func (r *RedisSource) Get(ctx context.Context, bucketKey string) (time.Time, err
if err != nil {
if errors.Is(err, redis.Nil) {
// Bucket key does not exist.
r.getLatency.With(prometheus.Labels{"result": "notFound"}).Observe(time.Since(start).Seconds())
r.latency.With(prometheus.Labels{"call": "get", "result": "notFound"}).Observe(time.Since(start).Seconds())
return time.Time{}, ErrBucketNotFound
}
r.getLatency.With(prometheus.Labels{"result": resultForError(err)}).Observe(time.Since(start).Seconds())
r.latency.With(prometheus.Labels{"call": "get", "result": resultForError(err)}).Observe(time.Since(start).Seconds())
return time.Time{}, err
}

r.getLatency.With(prometheus.Labels{"result": "success"}).Observe(time.Since(start).Seconds())
r.latency.With(prometheus.Labels{"call": "get", "result": "success"}).Observe(time.Since(start).Seconds())
return time.Unix(0, tatNano).UTC(), nil
}

Expand All @@ -139,22 +113,26 @@ func (r *RedisSource) Delete(ctx context.Context, bucketKey string) error {

err := r.client.Del(ctx, bucketKey).Err()
if err != nil {
r.deleteLatency.With(prometheus.Labels{"result": resultForError(err)}).Observe(time.Since(start).Seconds())
r.latency.With(prometheus.Labels{"call": "delete", "result": resultForError(err)}).Observe(time.Since(start).Seconds())
return err
}

r.deleteLatency.With(prometheus.Labels{"result": "success"}).Observe(time.Since(start).Seconds())
r.latency.With(prometheus.Labels{"call": "delete", "result": "success"}).Observe(time.Since(start).Seconds())
return nil
}

// Ping checks that each shard of the *redis.Ring is reachable using the PING
// command. It returns an error if any shard is unreachable and nil otherwise.
func (r *RedisSource) Ping(ctx context.Context) error {
start := r.clk.Now()

err := r.client.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error {
return shard.Ping(ctx).Err()
})
if err != nil {
r.latency.With(prometheus.Labels{"call": "ping", "result": resultForError(err)}).Observe(time.Since(start).Seconds())
return err
}
r.latency.With(prometheus.Labels{"call": "ping", "result": "success"}).Observe(time.Since(start).Seconds())
return nil
}
16 changes: 2 additions & 14 deletions wfe2/wfe.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,9 +629,8 @@ func (wfe *WebFrontEndImpl) checkNewAccountLimits(ctx context.Context, ip net.IP
}
decision, err := wfe.limiter.Spend(ctx, ratelimits.NewRegistrationsPerIPAddress, ip.String(), 1)
if err != nil {
if errors.Is(err, ratelimits.ErrLimitDisabled) {
return
}
// TODO(#5545): Once key-value rate limits are authoritative this log
// line should be removed in favor of returning the error.
wfe.log.Warningf("checking %s rate limit: %s", ratelimits.NewRegistrationsPerIPAddress, err)
return
}
Expand All @@ -647,9 +646,6 @@ func (wfe *WebFrontEndImpl) checkNewAccountLimits(ctx context.Context, ip net.IP
ipNet := &net.IPNet{IP: ip.Mask(ipMask), Mask: ipMask}
_, err = wfe.limiter.Spend(ctx, ratelimits.NewRegistrationsPerIPv6Range, ipNet.String(), 1)
if err != nil {
if errors.Is(err, ratelimits.ErrLimitDisabled) {
return
}
wfe.log.Warningf("checking %s rate limit: %s", ratelimits.NewRegistrationsPerIPv6Range, err)
return
}
Expand All @@ -666,10 +662,6 @@ func (wfe *WebFrontEndImpl) refundNewAccountLimits(ctx context.Context, ip net.I
}
_, err := wfe.limiter.Refund(ctx, ratelimits.NewRegistrationsPerIPAddress, ip.String(), 1)
if err != nil {
if errors.Is(err, ratelimits.ErrLimitDisabled) ||
errors.Is(err, ratelimits.ErrBucketAlreadyFull) {
return
}
wfe.log.Warningf("refunding new account rate limit: %s", err)
return
}
Expand All @@ -684,10 +676,6 @@ func (wfe *WebFrontEndImpl) refundNewAccountLimits(ctx context.Context, ip net.I
ipNet := &net.IPNet{IP: ip.Mask(ipMask), Mask: ipMask}
_, err = wfe.limiter.Refund(ctx, ratelimits.NewRegistrationsPerIPv6Range, ipNet.String(), 1)
if err != nil {
if errors.Is(err, ratelimits.ErrLimitDisabled) ||
errors.Is(err, ratelimits.ErrBucketAlreadyFull) {
return
}
wfe.log.Warningf("refunding new account rate limit: %s", err)
}
}
Expand Down

0 comments on commit 30de5d7

Please sign in to comment.