Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WFE: Add new key-value ratelimits implementation #7089

Merged
merged 15 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions cmd/boulder-wfe2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
blog "github.com/letsencrypt/boulder/log"
"github.com/letsencrypt/boulder/nonce"
rapb "github.com/letsencrypt/boulder/ra/proto"
"github.com/letsencrypt/boulder/ratelimits"
bredis "github.com/letsencrypt/boulder/redis"
sapb "github.com/letsencrypt/boulder/sa/proto"
"github.com/letsencrypt/boulder/wfe2"
)
Expand Down Expand Up @@ -137,6 +139,25 @@ type Config struct {
PendingAuthorizationLifetimeDays int `validate:"required,min=1,max=29"`

AccountCache *CacheConfig

Limiter struct {
// Redis contains the configuration necessary to connect to Redis
// for rate limiting. This field is required to enable rate
// limiting.
Redis *bredis.Config `validate:"required_with=Defaults"`

// Defaults is a path to a YAML file containing default rate limits.
// See: ratelimits/README.md for details. This field is required to
// enable rate limiting. If any individual rate limit is not set,
// that limit will be disabled.
Defaults string `validate:"required_with=Redis"`

// Overrides is a path to a YAML file containing overrides for the
// default rate limits. See: ratelimits/README.md for details. If
// this field is not set, all requesters will be subject to the
// default rate limits.
Overrides string
}
}

Syslog cmd.SyslogConfig
Expand Down Expand Up @@ -318,6 +339,18 @@ func main() {
}
pendingAuthorizationLifetime := time.Duration(c.WFE.PendingAuthorizationLifetimeDays) * 24 * time.Hour

var limiter *ratelimits.Limiter
var limiterRedis *bredis.Ring
if c.WFE.Limiter.Defaults != "" {
// Setup rate limiting.
limiterRedis, err = bredis.NewRingFromConfig(*c.WFE.Limiter.Redis, stats, logger)
cmd.FailOnError(err, "Failed to create Redis ring")

source := ratelimits.NewRedisSource(limiterRedis.Ring, clk, stats)
limiter, err = ratelimits.NewLimiter(clk, source, c.WFE.Limiter.Defaults, c.WFE.Limiter.Overrides, stats)
cmd.FailOnError(err, "Failed to create rate limiter")
}

var accountGetter wfe2.AccountGetter
if c.WFE.AccountCache != nil {
accountGetter = wfe2.NewAccountCache(sac,
Expand Down Expand Up @@ -346,6 +379,7 @@ func main() {
rnc,
npKey,
accountGetter,
limiter,
)
cmd.FailOnError(err, "Unable to create WFE")

Expand Down Expand Up @@ -402,6 +436,7 @@ func main() {
defer cancel()
_ = srv.Shutdown(ctx)
_ = tlsSrv.Shutdown(ctx)
limiterRedis.StopLookups()
oTelShutdown(ctx)
}()

Expand Down
2 changes: 1 addition & 1 deletion ratelimits/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func loadAndParseOverrideLimits(path string) (limits, error) {
err = validateIdForName(name, id)
if err != nil {
return nil, fmt.Errorf(
"validating name %s and id %q for override limit %q: %w", nameToString[name], id, k, err)
"validating name %s and id %q for override limit %q: %w", name, id, k, err)
}
if name == CertificatesPerFQDNSetPerAccount {
// FQDNSet hashes are not a nice thing to ask for in a config file,
Expand Down
11 changes: 4 additions & 7 deletions ratelimits/limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,30 @@ import (
)

func Test_parseOverrideNameId(t *testing.T) {
newRegistrationsPerIPAddressStr := nameToString[NewRegistrationsPerIPAddress]
newRegistrationsPerIPv6RangeStr := nameToString[NewRegistrationsPerIPv6Range]

// 'enum:ipv4'
// Valid IPv4 address.
name, id, err := parseOverrideNameId(newRegistrationsPerIPAddressStr + ":10.0.0.1")
name, id, err := parseOverrideNameId(NewRegistrationsPerIPAddress.String() + ":10.0.0.1")
test.AssertNotError(t, err, "should not error")
test.AssertEquals(t, name, NewRegistrationsPerIPAddress)
test.AssertEquals(t, id, "10.0.0.1")

// 'enum:ipv6range'
// Valid IPv6 address range.
name, id, err = parseOverrideNameId(newRegistrationsPerIPv6RangeStr + ":2001:0db8:0000::/48")
name, id, err = parseOverrideNameId(NewRegistrationsPerIPv6Range.String() + ":2001:0db8:0000::/48")
test.AssertNotError(t, err, "should not error")
test.AssertEquals(t, name, NewRegistrationsPerIPv6Range)
test.AssertEquals(t, id, "2001:0db8:0000::/48")

// Missing colon (this should never happen but we should avoid panicking).
_, _, err = parseOverrideNameId(newRegistrationsPerIPAddressStr + "10.0.0.1")
_, _, err = parseOverrideNameId(NewRegistrationsPerIPAddress.String() + "10.0.0.1")
test.AssertError(t, err, "missing colon")

// Empty string.
_, _, err = parseOverrideNameId("")
test.AssertError(t, err, "empty string")

// Only a colon.
_, _, err = parseOverrideNameId(newRegistrationsPerIPAddressStr + ":")
_, _, err = parseOverrideNameId(NewRegistrationsPerIPAddress.String() + ":")
test.AssertError(t, err, "only a colon")

// Invalid enum.
Expand Down
88 changes: 77 additions & 11 deletions ratelimits/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,13 @@ var ErrInvalidCostForCheck = fmt.Errorf("invalid check cost, must be >= 0")
// ErrInvalidCostOverLimit indicates that the cost specified was > limit.Burst.
var ErrInvalidCostOverLimit = fmt.Errorf("invalid cost, must be <= limit.Burst")

// ErrBucketAlreadyFull indicates that the bucket already has reached its
// maximum capacity.
var ErrBucketAlreadyFull = fmt.Errorf("bucket already full")
// errLimitDisabled indicates that the limit name specified is valid but is not
// currently configured.
var errLimitDisabled = errors.New("limit disabled")

// disabledLimitDecision is an "allowed" *Decision that should be returned when
// a checked limit is found to be disabled.
var disabledLimitDecision = &Decision{true, 0, 0, 0, time.Time{}}

// Limiter provides a high-level interface for rate limiting requests by
// utilizing a leaky bucket-style approach.
Expand All @@ -46,6 +50,7 @@ type Limiter struct {
source source
clk clock.Clock

spendLatency *prometheus.HistogramVec
overrideUsageGauge *prometheus.GaugeVec
}

Expand All @@ -62,6 +67,14 @@ func NewLimiter(clk clock.Clock, source source, defaults, overrides string, stat
return nil, err
}

limiter.spendLatency = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "ratelimits_spend_latency",
Help: fmt.Sprintf("Latency of ratelimit checks labeled by limit=[name] and decision=[%s|%s], in seconds", Allowed, Denied),
// Exponential buckets ranging from 0.0005s to 3s.
Buckets: prometheus.ExponentialBuckets(0.0005, 3, 8),
}, []string{"limit", "decision"})
stats.MustRegister(limiter.spendLatency)

if overrides == "" {
// No overrides specified, initialize an empty map.
limiter.overrides = make(limits)
Expand Down Expand Up @@ -114,21 +127,28 @@ type Decision struct {
// wait time before the client can make another request, and the time until the
// bucket refills to its maximum capacity (resets). If no bucket exists for the
// given limit Name and client id, a new one will be created WITHOUT the
// request's cost deducted from its initial capacity.
// request's cost deducted from its initial capacity. If the specified limit is
// disabled, ErrLimitDisabled is returned.
func (l *Limiter) Check(ctx context.Context, name Name, id string, cost int64) (*Decision, error) {
if cost < 0 {
return nil, ErrInvalidCostForCheck
}

limit, err := l.getLimit(name, id)
if err != nil {
if errors.Is(err, errLimitDisabled) {
return disabledLimitDecision, nil
}
return nil, err
}

if cost > limit.Burst {
return nil, ErrInvalidCostOverLimit
}

// Remove cancellation from the request context so that transactions are not
// interrupted by a client disconnect.
ctx = context.WithoutCancel(ctx)
pgporada marked this conversation as resolved.
Show resolved Hide resolved
beautifulentropy marked this conversation as resolved.
Show resolved Hide resolved
tat, err := l.source.Get(ctx, bucketKey(name, id))
if err != nil {
if !errors.Is(err, ErrBucketNotFound) {
Expand All @@ -153,26 +173,46 @@ func (l *Limiter) Check(ctx context.Context, name Name, id string, cost int64) (
// required wait time before the client can make another request, and the time
// until the bucket refills to its maximum capacity (resets). If no bucket
// exists for the given limit Name and client id, a new one will be created WITH
// the request's cost deducted from its initial capacity.
// the request's cost deducted from its initial capacity. If the specified limit
// is disabled, ErrLimitDisabled is returned.
func (l *Limiter) Spend(ctx context.Context, name Name, id string, cost int64) (*Decision, error) {
if cost <= 0 {
return nil, ErrInvalidCost
}

limit, err := l.getLimit(name, id)
if err != nil {
if errors.Is(err, errLimitDisabled) {
return disabledLimitDecision, nil
}
return nil, err
}

if cost > limit.Burst {
return nil, ErrInvalidCostOverLimit
beautifulentropy marked this conversation as resolved.
Show resolved Hide resolved
}

start := l.clk.Now()
status := Denied
defer func() {
l.spendLatency.WithLabelValues(name.String(), status).Observe(l.clk.Since(start).Seconds())
}()

// Remove cancellation from the request context so that transactions are not
// interrupted by a client disconnect.
ctx = context.WithoutCancel(ctx)
tat, err := l.source.Get(ctx, bucketKey(name, id))
if err != nil {
if errors.Is(err, ErrBucketNotFound) {
// First request from this client.
return l.initialize(ctx, limit, name, id, cost)
d, err := l.initialize(ctx, limit, name, id, cost)
if err != nil {
return nil, err
}
if d.Allowed {
status = Allowed
}
return d, nil
}
return nil, err
}
Expand All @@ -183,13 +223,19 @@ func (l *Limiter) Spend(ctx context.Context, name Name, id string, cost int64) (
// Calculate the current utilization of the override limit for the
// specified client id.
utilization := float64(limit.Burst-d.Remaining) / float64(limit.Burst)
l.overrideUsageGauge.WithLabelValues(nameToString[name], id).Set(utilization)
l.overrideUsageGauge.WithLabelValues(name.String(), id).Set(utilization)
}

if !d.Allowed {
return d, nil
}
return d, l.source.Set(ctx, bucketKey(name, id), d.newTAT)

err = l.source.Set(ctx, bucketKey(name, id), d.newTAT)
if err != nil {
return nil, err
}
status = Allowed
return d, nil
}

// Refund attempts to refund the cost to the bucket identified by limit name and
Expand All @@ -210,30 +256,44 @@ func (l *Limiter) Refund(ctx context.Context, name Name, id string, cost int64)

limit, err := l.getLimit(name, id)
if err != nil {
if errors.Is(err, errLimitDisabled) {
return disabledLimitDecision, nil
}
return nil, err
}

// Remove cancellation from the request context so that transactions are not
// interrupted by a client disconnect.
ctx = context.WithoutCancel(ctx)
tat, err := l.source.Get(ctx, bucketKey(name, id))
if err != nil {
return nil, err
}
d := maybeRefund(l.clk, limit, tat, cost)
if !d.Allowed {
return d, ErrBucketAlreadyFull
// The bucket is already at maximum capacity.
return d, nil
}
return d, l.source.Set(ctx, bucketKey(name, id), d.newTAT)

}

// Reset resets the specified bucket.
func (l *Limiter) Reset(ctx context.Context, name Name, id string) error {
// Remove cancellation from the request context so that transactions are not
// interrupted by a client disconnect.
ctx = context.WithoutCancel(ctx)
return l.source.Delete(ctx, bucketKey(name, id))
}

// initialize creates a new bucket, specified by limit name and id, with the
// cost of the request factored into the initial state.
func (l *Limiter) initialize(ctx context.Context, rl limit, name Name, id string, cost int64) (*Decision, error) {
d := maybeSpend(l.clk, rl, l.clk.Now(), cost)

// Remove cancellation from the request context so that transactions are not
// interrupted by a client disconnect.
ctx = context.WithoutCancel(ctx)
err := l.source.Set(ctx, bucketKey(name, id), d.newTAT)
if err != nil {
return nil, err
Expand All @@ -244,8 +304,14 @@ func (l *Limiter) initialize(ctx context.Context, rl limit, name Name, id string

// GetLimit returns the limit for the specified by name and id, name is
// required, id is optional. If id is left unspecified, the default limit for
// the limit specified by name is returned.
// the limit specified by name is returned. If no default limit exists for the
// specified name, ErrLimitDisabled is returned.
func (l *Limiter) getLimit(name Name, id string) (limit, error) {
if !name.isValid() {
// This should never happen. Callers should only be specifying the limit
// Name enums defined in this package.
return limit{}, fmt.Errorf("specified name enum %q, is invalid", name)
}
if id != "" {
// Check for override.
ol, ok := l.overrides[bucketKey(name, id)]
Expand All @@ -257,5 +323,5 @@ func (l *Limiter) getLimit(name Name, id string) (limit, error) {
if ok {
return dl, nil
}
return limit{}, fmt.Errorf("limit %q does not exist", name)
return limit{}, errLimitDisabled
}
7 changes: 4 additions & 3 deletions ratelimits/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func Test_Limiter_CheckWithLimitOverrides(t *testing.T) {
// Verify our overrideUsageGauge is being set correctly. 0.0 == 0% of
// the bucket has been consumed.
test.AssertMetricWithLabelsEquals(t, l.overrideUsageGauge, prometheus.Labels{
"limit": nameToString[NewRegistrationsPerIPAddress], "client_id": tenZeroZeroTwo}, 0)
"limit": NewRegistrationsPerIPAddress.String(), "client_id": tenZeroZeroTwo}, 0)

// Attempt to check a spend of 41 requests (a cost > the limit burst
// capacity), this should fail with a specific error.
Expand All @@ -108,7 +108,7 @@ func Test_Limiter_CheckWithLimitOverrides(t *testing.T) {
// Verify our overrideUsageGauge is being set correctly. 1.0 == 100% of
// the bucket has been consumed.
test.AssertMetricWithLabelsEquals(t, l.overrideUsageGauge, prometheus.Labels{
"limit_name": nameToString[NewRegistrationsPerIPAddress], "client_id": tenZeroZeroTwo}, 1.0)
"limit_name": NewRegistrationsPerIPAddress.String(), "client_id": tenZeroZeroTwo}, 1.0)

// Verify our RetryIn is correct. 1 second == 1000 milliseconds and
// 1000/40 = 25 milliseconds per request.
Expand Down Expand Up @@ -337,7 +337,8 @@ func Test_Limiter_RefundAndReset(t *testing.T) {

// Refund 1 requests above our limit, this should fail.
d, err = l.Refund(testCtx, NewRegistrationsPerIPAddress, testIP, 1)
test.AssertErrorIs(t, err, ErrBucketAlreadyFull)
test.AssertNotError(t, err, "should not error")
test.Assert(t, !d.Allowed, "should not be allowed")
test.AssertEquals(t, d.Remaining, int64(20))
})
}
Expand Down
22 changes: 21 additions & 1 deletion ratelimits/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ const (
NewRegistrationsPerIPAddress

// NewRegistrationsPerIPv6Range uses bucket key 'enum:ipv6rangeCIDR'. The
// address range must be a /48.
// address range must be a /48. RFC 3177, which was published in 2001,
// advised operators to allocate a /48 block of IPv6 addresses for most end
// sites. RFC 6177, which was published in 2011 and obsoletes RFC 3177,
// advises allocating a smaller /56 block. We've chosen to use the larger
// /48 block for our IPv6 rate limiting. See:
// 1. https://tools.ietf.org/html/rfc3177#section-3
// 2. https://datatracker.ietf.org/doc/html/rfc6177#section-2
NewRegistrationsPerIPv6Range

// NewOrdersPerAccount uses bucket key 'enum:regId'.
Expand All @@ -47,6 +53,20 @@ const (
CertificatesPerFQDNSetPerAccount
)

// isValid returns true if the Name is a valid rate limit name.
func (n Name) isValid() bool {
return n > Unknown && n < Name(len(nameToString))
}

// String returns the string representation of the Name. It allows Name to
// satisfy the fmt.Stringer interface.
func (n Name) String() string {
if !n.isValid() {
return nameToString[Unknown]
}
return nameToString[n]
}

// nameToString is a map of Name values to string names.
var nameToString = map[Name]string{
Unknown: "Unknown",
Expand Down
Loading