Skip to content

Commit

Permalink
redis: Add support for *redis.Ring shard configuration using SRV records
Browse files Browse the repository at this point in the history
  • Loading branch information
beautifulentropy committed Aug 18, 2023
1 parent 3b00623 commit 6b64fe8
Show file tree
Hide file tree
Showing 3 changed files with 248 additions and 0 deletions.
154 changes: 154 additions & 0 deletions redis/lookup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package redis

import (
"context"
"fmt"
"net"
"strings"
"time"

"github.com/letsencrypt/boulder/cmd"
blog "github.com/letsencrypt/boulder/log"

"github.com/redis/go-redis/v9"
)

// Lookup is a helper that keeps *redis.Ring shards up to date using SRV
// lookups.
type Lookup struct {
// service is the symbolic name of the desired service.
service string

// domain is the domain name of the desired service.
domain string

// updateFrequency is the frequency of periodic SRV lookups. Defaults to 30
// seconds.
updateFrequency time.Duration

// dnsAuthority is the single <hostname|IPv4|[IPv6]>:<port> of the DNS
// server to be used for SRV lookups. If the address contains a hostname it
// will be resolved via the system DNS. If the port is left unspecified it
// will default to '53'. If this field is left unspecified the system DNS
// will be used for resolution.
dnsAuthority string

ring *redis.Ring
logger blog.Logger
}

// NewLookup returns a new Lookup helper.
func NewLookup(srv cmd.ServiceDomain, dnsAuthority string, frequency time.Duration, ring *redis.Ring, logger blog.Logger) *Lookup {
if frequency == 0 {
// Use default frequency.
frequency = 30 * time.Second
}
if dnsAuthority != "" {
host, port, err := net.SplitHostPort(dnsAuthority)
if err != nil {
// Assume only hostname or IPv4 address was specified.
host = dnsAuthority
port = "53"
}
dnsAuthority = net.JoinHostPort(host, port)
}
return &Lookup{
service: srv.Service,
domain: srv.Domain,
updateFrequency: frequency,
dnsAuthority: dnsAuthority,
ring: ring,
logger: logger,
}
}

// getResolver returns a resolver that will be used to perform SRV lookups.
func (look *Lookup) getResolver() *net.Resolver {
if look.dnsAuthority == "" {
return net.DefaultResolver
}
return &net.Resolver{
PreferGo: true,
Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
return net.Dial(network, look.dnsAuthority)
},
}
}

// dnsName returns DNS name to look up as defined in RFC 2782.
func (look *Lookup) dnsName() string {
return fmt.Sprintf("_%s._tcp.%s", look.service, look.domain)
}

// LookupShards performs SRV lookups for the given service name and returns the
// resolved shard addresses.
func (look *Lookup) Shards(ctx context.Context) (map[string]string, error) {
resolver := look.getResolver()

_, addrs, err := resolver.LookupSRV(ctx, look.service, "tcp", look.domain)
if err != nil {
return nil, fmt.Errorf("failed to lookup SRV records for service %q: %w", look.dnsName(), err)
}

if len(addrs) <= 0 {
return nil, fmt.Errorf("no SRV targets found for service %q", look.dnsName())
}

newAddrs := make(map[string]string)

for _, srv := range addrs {
host := strings.TrimRight(srv.Target, ".")

if look.dnsAuthority != "" {
// Lookup A/AAAA records for the SRV target using the custom DNS
// authority.
hostAddrs, err := resolver.LookupHost(ctx, host)
if err != nil {
return nil, fmt.Errorf("failed to lookup A/AAAA records for %q: %w", host, err)
}
if len(hostAddrs) <= 0 {
return nil, fmt.Errorf("no A/AAAA records found for %q", host)
}
// Use the first resolved IP address.
host = hostAddrs[0]
}

addr := fmt.Sprintf("%s:%d", host, srv.Port)
newAddrs[addr] = addr
}
return newAddrs, nil
}

// ShardsPeriodically periodically performs SRV lookups for the given service
// name and updates the ring shards accordingly.
func (look *Lookup) ShardsPeriodically(ctx context.Context, frequency time.Duration) {
ticker := time.NewTicker(frequency)
defer ticker.Stop()

for {
select {
case <-ticker.C:
timeoutCtx, cancel := context.WithTimeout(ctx, frequency)
newAddrs, err := look.Shards(timeoutCtx)
cancel()
if err != nil {
look.logger.Errf(err.Error())
continue
}
look.ring.SetAddrs(newAddrs)

case <-ctx.Done():
return
}
}
}

// Start starts the periodic SRV lookups.
func (look *Lookup) Start(ctx context.Context) {
addrs, err := look.Shards(ctx)
if err != nil {
panic(err)
}
look.ring.SetAddrs(addrs)
go look.ShardsPeriodically(ctx, look.updateFrequency)
}
78 changes: 78 additions & 0 deletions redis/lookup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package redis

import (
"context"
"testing"
"time"

"github.com/letsencrypt/boulder/cmd"
blog "github.com/letsencrypt/boulder/log"
"github.com/letsencrypt/boulder/metrics"
"github.com/letsencrypt/boulder/test"

"github.com/redis/go-redis/v9"
)

func newTestRedisRing() *redis.Ring {
CACertFile := "../test/redis-tls/minica.pem"
CertFile := "../test/redis-tls/boulder/cert.pem"
KeyFile := "../test/redis-tls/boulder/key.pem"
tlsConfig := cmd.TLSConfig{
CACertFile: CACertFile,
CertFile: CertFile,
KeyFile: KeyFile,
}
tlsConfig2, err := tlsConfig.Load(metrics.NoopRegisterer)
if err != nil {
panic(err)
}

client := redis.NewRing(&redis.RingOptions{
Username: "unittest-rw",
Password: "824968fa490f4ecec1e52d5e34916bdb60d45f8d",
TLSConfig: tlsConfig2,
})
return client
}

func Test_Lookup(t *testing.T) {
t.Parallel()

logger := blog.NewMock()
ring := newTestRedisRing()

lookup := NewLookup(cmd.ServiceDomain{
Service: "redisratelimits",
Domain: "service.consul",
},
"consul.service.consul",
250*time.Millisecond,
ring,
logger,
)

testCtx, cancel := context.WithCancel(context.Background())
defer cancel()

lookup.Start(testCtx)

// The Consul service entry for 'redisratelimits' is configured to return
// two SRV targets. We should only have two shards in the ring.
test.Assert(t, ring.Len() == 2, "Expected 2 shards in the ring")

// Ensure we can reach both shards using the PING command.
err := ring.ForEachShard(testCtx, func(ctx context.Context, shard *redis.Client) error {
return shard.Ping(ctx).Err()
})
test.AssertNotError(t, err, "Expected PING to succeed for both shards")

// Drop both Shards from the ring.
ring.SetAddrs(map[string]string{})
test.Assert(t, ring.Len() == 0, "Expected 0 shards in the ring")

// Sleep 300ms to allow the periodic lookup to run.
time.Sleep(300 * time.Millisecond)

// The ring should now have two shards again.
test.Assert(t, ring.Len() == 2, "Expected 2 shards in the ring")
}
16 changes: 16 additions & 0 deletions test/consul/config.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,22 @@ services {
tags = ["tcp"] // Required for SRV RR support in gRPC DNS resolution.
}

services {
id = "bredis3"
name = "redisratelimits"
address = "10.33.33.4"
port = 4218
tags = ["tcp"] // Required for SRV RR support in DNS resolution.
}

services {
id = "bredis4"
name = "redisratelimits"
address = "10.33.33.5"
port = 4218
tags = ["tcp"] // Required for SRV RR support in DNS resolution.
}

//
// The following services are used for testing the gRPC DNS resolver.
//
Expand Down

0 comments on commit 6b64fe8

Please sign in to comment.