Skip to content

Commit

Permalink
crl-updater: lease CRL shards to prevent races (#6941)
Browse files Browse the repository at this point in the history
Add a new feature flag, LeaseCRLShards, which controls certain aspects
of crl-updater's behavior.

When this flag is enabled, crl-updater calls the new SA.LeaseCRLShard
method before beginning work on a shard. This prevents it from stepping
on the toes of another crl-updater instance which may be working on the
same shard. This is important to prevent two competing instances from
accidentally updating a CRL's Number (which is an integer representation
of its thisUpdate timestamp) *backwards*, which would be a compliance
violation.

When this flag is enabled, crl-updater also calls the new
SA.UpdateCRLShard method after finishing work on a shard.

In the future, additional work will be done to make crl-updater use the
"give me the oldest available shard" mode of the LeaseCRLShard method.

Fixes #6897
  • Loading branch information
aarongable authored and pgporada committed Jul 20, 2023
1 parent dcb078c commit 6e6037e
Show file tree
Hide file tree
Showing 10 changed files with 199 additions and 51 deletions.
13 changes: 12 additions & 1 deletion cmd/crl-updater/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ type Config struct {
// less than the UpdatePeriod.
UpdateOffset config.Duration

// UpdateTimeout controls how long a single CRL shard is allowed to attempt
// to update before being timed out. The total CRL updating process may take
// significantly longer, since a full update cycle may consist of updating
// many shards with varying degrees of parallelism. This value must be
// strictly less than the UpdatePeriod. Defaults to 1 hour.
UpdateTimeout config.Duration `validate:"-"`

// MaxParallelism controls how many workers may be running in parallel.
// A higher value reduces the total time necessary to update all CRL shards
// that this updater is responsible for, but also increases the memory used
Expand Down Expand Up @@ -142,10 +149,13 @@ func main() {
if c.CRLUpdater.LookbackPeriod.Duration == 0 {
c.CRLUpdater.LookbackPeriod.Duration = 24 * time.Hour
}
if c.CRLUpdater.UpdateTimeout.Duration == 0 {
c.CRLUpdater.UpdateTimeout.Duration = 1 * time.Hour
}

saConn, err := bgrpc.ClientSetup(c.CRLUpdater.SAService, tlsConfig, scope, clk)
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA")
sac := sapb.NewStorageAuthorityReadOnlyClient(saConn)
sac := sapb.NewStorageAuthorityClient(saConn)

caConn, err := bgrpc.ClientSetup(c.CRLUpdater.CRLGeneratorService, tlsConfig, scope, clk)
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to CRLGenerator")
Expand All @@ -162,6 +172,7 @@ func main() {
c.CRLUpdater.LookbackPeriod.Duration,
c.CRLUpdater.UpdatePeriod.Duration,
c.CRLUpdater.UpdateOffset.Duration,
c.CRLUpdater.UpdateTimeout.Duration,
c.CRLUpdater.MaxParallelism,
c.CRLUpdater.MaxAttempts,
sac,
Expand Down
42 changes: 40 additions & 2 deletions crl/updater/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ import (
"github.com/jmhodges/clock"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"

capb "github.com/letsencrypt/boulder/ca/proto"
"github.com/letsencrypt/boulder/core"
"github.com/letsencrypt/boulder/core/proto"
"github.com/letsencrypt/boulder/crl"
cspb "github.com/letsencrypt/boulder/crl/storer/proto"
"github.com/letsencrypt/boulder/features"
"github.com/letsencrypt/boulder/issuance"
blog "github.com/letsencrypt/boulder/log"
sapb "github.com/letsencrypt/boulder/sa/proto"
Expand All @@ -33,10 +35,11 @@ type crlUpdater struct {
lookbackPeriod time.Duration
updatePeriod time.Duration
updateOffset time.Duration
updateTimeout time.Duration
maxParallelism int
maxAttempts int

sa sapb.StorageAuthorityReadOnlyClient
sa sapb.StorageAuthorityClient
ca capb.CRLGeneratorClient
cs cspb.CRLStorerClient

Expand All @@ -54,9 +57,10 @@ func NewUpdater(
lookbackPeriod time.Duration,
updatePeriod time.Duration,
updateOffset time.Duration,
updateTimeout time.Duration,
maxParallelism int,
maxAttempts int,
sa sapb.StorageAuthorityReadOnlyClient,
sa sapb.StorageAuthorityClient,
ca capb.CRLGeneratorClient,
cs cspb.CRLStorerClient,
stats prometheus.Registerer,
Expand All @@ -80,6 +84,10 @@ func NewUpdater(
return nil, fmt.Errorf("update offset must be less than period: %s !< %s", updateOffset, updatePeriod)
}

if updateTimeout >= updatePeriod {
return nil, fmt.Errorf("update timeout must be less than period: %s !< %s", updateTimeout, updatePeriod)
}

if lookbackPeriod < 2*updatePeriod {
return nil, fmt.Errorf("lookbackPeriod must be at least 2x updatePeriod: %s !< 2 * %s", lookbackPeriod, updatePeriod)
}
Expand Down Expand Up @@ -112,6 +120,7 @@ func NewUpdater(
lookbackPeriod,
updatePeriod,
updateOffset,
updateTimeout,
maxParallelism,
maxAttempts,
sa,
Expand Down Expand Up @@ -250,10 +259,12 @@ func (cu *crlUpdater) tickIssuer(ctx context.Context, atTime time.Time, issuerNa
case <-ctx.Done():
return
default:
ctx, cancel := context.WithTimeout(ctx, cu.updateTimeout)
out <- shardResult{
shardIdx: idx,
err: cu.tickShardWithRetry(ctx, atTime, issuerNameID, idx, shardMap[idx]),
}
cancel()
}
}
}
Expand Down Expand Up @@ -335,6 +346,24 @@ func (cu *crlUpdater) tickShard(ctx context.Context, atTime time.Time, issuerNam
cu.log.Infof(
"Generating CRL shard: id=[%s] numChunks=[%d]", crlID, len(chunks))

if features.Enabled(features.LeaseCRLShards) {
// Notify the database that we're working on this shard.
deadline, ok := ctx.Deadline()
if !ok {
return fmt.Errorf("context has no deadline")
}

_, err = cu.sa.LeaseCRLShard(ctx, &sapb.LeaseCRLShardRequest{
IssuerNameID: int64(issuerNameID),
MinShardIdx: int64(shardIdx),
MaxShardIdx: int64(shardIdx),
Until: timestamppb.New(deadline.Add(-time.Second)),
})
if err != nil {
return fmt.Errorf("leasing shard: %w", err)
}
}

// Get the full list of CRL Entries for this shard from the SA.
var crlEntries []*proto.CRLEntry
for _, chunk := range chunks {
Expand Down Expand Up @@ -452,6 +481,15 @@ func (cu *crlUpdater) tickShard(ctx context.Context, atTime time.Time, issuerNam
return fmt.Errorf("closing CRLStorer upload stream: %w", err)
}

if features.Enabled(features.LeaseCRLShards) {
// Notify the database that that we're done.
_, err = cu.sa.UpdateCRLShard(ctx, &sapb.UpdateCRLShardRequest{
IssuerNameID: int64(issuerNameID),
ShardIdx: int64(shardIdx),
ThisUpdate: timestamppb.New(atTime),
})
}

cu.log.Infof(
"Generated CRL shard: id=[%s] size=[%d] hash=[%x]",
crlID, crlLen, crlHash.Sum(nil))
Expand Down
48 changes: 36 additions & 12 deletions crl/updater/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
capb "github.com/letsencrypt/boulder/ca/proto"
corepb "github.com/letsencrypt/boulder/core/proto"
cspb "github.com/letsencrypt/boulder/crl/storer/proto"
"github.com/letsencrypt/boulder/features"
"github.com/letsencrypt/boulder/issuance"
blog "github.com/letsencrypt/boulder/log"
"github.com/letsencrypt/boulder/metrics"
Expand Down Expand Up @@ -50,19 +51,27 @@ func (f *fakeGRCC) Recv() (*corepb.CRLEntry, error) {
// fakeGRCC to be used as the return value for calls to GetRevokedCerts, and a
// fake timestamp to serve as the database's maximum notAfter value.
type fakeSAC struct {
mocks.StorageAuthorityReadOnly
mocks.StorageAuthority
grcc fakeGRCC
maxNotAfter time.Time
leaseError error
}

func (f *fakeSAC) GetRevokedCerts(ctx context.Context, _ *sapb.GetRevokedCertsRequest, _ ...grpc.CallOption) (sapb.StorageAuthorityReadOnly_GetRevokedCertsClient, error) {
func (f *fakeSAC) GetRevokedCerts(ctx context.Context, _ *sapb.GetRevokedCertsRequest, _ ...grpc.CallOption) (sapb.StorageAuthority_GetRevokedCertsClient, error) {
return &f.grcc, nil
}

func (f *fakeSAC) GetMaxExpiration(_ context.Context, req *emptypb.Empty, _ ...grpc.CallOption) (*timestamppb.Timestamp, error) {
return timestamppb.New(f.maxNotAfter), nil
}

func (f *fakeSAC) LeaseCRLShard(_ context.Context, req *sapb.LeaseCRLShardRequest, _ ...grpc.CallOption) (*sapb.LeaseCRLShardResponse, error) {
if f.leaseError != nil {
return nil, f.leaseError
}
return &sapb.LeaseCRLShardResponse{IssuerNameID: req.IssuerNameID, ShardIdx: req.MinShardIdx}, nil
}

// fakeGCC is a fake capb.CRLGenerator_GenerateCRLClient which can be
// populated with some CRL entries or an error for use as the return value of
// a faked GenerateCRL call.
Expand Down Expand Up @@ -140,13 +149,15 @@ func TestTickShard(t *testing.T) {
test.AssertNotError(t, err, "loading test issuer")

sentinelErr := errors.New("oops")
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

clk := clock.NewFake()
clk.Set(time.Date(2020, time.January, 1, 0, 0, 0, 0, time.UTC))
cu, err := NewUpdater(
[]*issuance.Certificate{e1, r3},
2, 18*time.Hour, 24*time.Hour,
6*time.Hour, 1*time.Minute, 1, 1,
6*time.Hour, time.Minute, time.Hour, 1, 1,
&fakeSAC{grcc: fakeGRCC{}, maxNotAfter: clk.Now().Add(90 * 24 * time.Hour)},
&fakeCGC{gcc: fakeGCC{}},
&fakeCSC{ucc: fakeUCC{}},
Expand All @@ -159,16 +170,29 @@ func TestTickShard(t *testing.T) {
}

// Ensure that getting no results from the SA still works.
err = cu.tickShard(context.Background(), cu.clk.Now(), e1.NameID(), 0, testChunks)
err = cu.tickShard(ctx, cu.clk.Now(), e1.NameID(), 0, testChunks)
test.AssertNotError(t, err, "empty CRL")
test.AssertMetricWithLabelsEquals(t, cu.updatedCounter, prometheus.Labels{
"issuer": "(TEST) Elegant Elephant E1", "result": "success",
}, 1)
cu.updatedCounter.Reset()

// With leasing enabled, errors while leasing should bubble up early.
_ = features.Set(map[string]bool{"LeaseCRLShards": true})
cu.sa.(*fakeSAC).leaseError = sentinelErr
err = cu.tickShard(ctx, cu.clk.Now(), e1.NameID(), 0, testChunks)
test.AssertError(t, err, "leasing error")
test.AssertContains(t, err.Error(), "leasing shard")
test.AssertErrorIs(t, err, sentinelErr)
test.AssertMetricWithLabelsEquals(t, cu.updatedCounter, prometheus.Labels{
"issuer": "(TEST) Elegant Elephant E1", "result": "failed",
}, 1)
cu.updatedCounter.Reset()
features.Reset()

// Errors closing the Storer upload stream should bubble up.
cu.cs = &fakeCSC{ucc: fakeUCC{recvErr: sentinelErr}}
err = cu.tickShard(context.Background(), cu.clk.Now(), e1.NameID(), 0, testChunks)
err = cu.tickShard(ctx, cu.clk.Now(), e1.NameID(), 0, testChunks)
test.AssertError(t, err, "storer error")
test.AssertContains(t, err.Error(), "closing CRLStorer upload stream")
test.AssertErrorIs(t, err, sentinelErr)
Expand All @@ -179,7 +203,7 @@ func TestTickShard(t *testing.T) {

// Errors sending to the Storer should bubble up sooner.
cu.cs = &fakeCSC{ucc: fakeUCC{sendErr: sentinelErr}}
err = cu.tickShard(context.Background(), cu.clk.Now(), e1.NameID(), 0, testChunks)
err = cu.tickShard(ctx, cu.clk.Now(), e1.NameID(), 0, testChunks)
test.AssertError(t, err, "storer error")
test.AssertContains(t, err.Error(), "sending CRLStorer metadata")
test.AssertErrorIs(t, err, sentinelErr)
Expand All @@ -190,7 +214,7 @@ func TestTickShard(t *testing.T) {

// Errors reading from the CA should bubble up sooner.
cu.ca = &fakeCGC{gcc: fakeGCC{recvErr: sentinelErr}}
err = cu.tickShard(context.Background(), cu.clk.Now(), e1.NameID(), 0, testChunks)
err = cu.tickShard(ctx, cu.clk.Now(), e1.NameID(), 0, testChunks)
test.AssertError(t, err, "CA error")
test.AssertContains(t, err.Error(), "receiving CRL bytes")
test.AssertErrorIs(t, err, sentinelErr)
Expand All @@ -201,7 +225,7 @@ func TestTickShard(t *testing.T) {

// Errors sending to the CA should bubble up sooner.
cu.ca = &fakeCGC{gcc: fakeGCC{sendErr: sentinelErr}}
err = cu.tickShard(context.Background(), cu.clk.Now(), e1.NameID(), 0, testChunks)
err = cu.tickShard(ctx, cu.clk.Now(), e1.NameID(), 0, testChunks)
test.AssertError(t, err, "CA error")
test.AssertContains(t, err.Error(), "sending CA metadata")
test.AssertErrorIs(t, err, sentinelErr)
Expand All @@ -212,7 +236,7 @@ func TestTickShard(t *testing.T) {

// Errors reading from the SA should bubble up soonest.
cu.sa = &fakeSAC{grcc: fakeGRCC{err: sentinelErr}, maxNotAfter: clk.Now().Add(90 * 24 * time.Hour)}
err = cu.tickShard(context.Background(), cu.clk.Now(), e1.NameID(), 0, testChunks)
err = cu.tickShard(ctx, cu.clk.Now(), e1.NameID(), 0, testChunks)
test.AssertError(t, err, "database error")
test.AssertContains(t, err.Error(), "retrieving entry from SA")
test.AssertErrorIs(t, err, sentinelErr)
Expand All @@ -237,7 +261,7 @@ func TestTickShardWithRetry(t *testing.T) {
cu, err := NewUpdater(
[]*issuance.Certificate{e1, r3},
2, 18*time.Hour, 24*time.Hour,
6*time.Hour, 1*time.Minute, 1, 1,
6*time.Hour, time.Minute, time.Hour, 1, 1,
&fakeSAC{grcc: fakeGRCC{err: sentinelErr}, maxNotAfter: clk.Now().Add(90 * 24 * time.Hour)},
&fakeCGC{gcc: fakeGCC{}},
&fakeCSC{ucc: fakeUCC{}},
Expand Down Expand Up @@ -283,7 +307,7 @@ func TestTickIssuer(t *testing.T) {
cu, err := NewUpdater(
[]*issuance.Certificate{e1, r3},
2, 18*time.Hour, 24*time.Hour,
6*time.Hour, 1*time.Minute, 1, 1,
6*time.Hour, time.Minute, time.Hour, 1, 1,
&fakeSAC{grcc: fakeGRCC{err: errors.New("db no worky")}, maxNotAfter: clk.Now().Add(90 * 24 * time.Hour)},
&fakeCGC{gcc: fakeGCC{}},
&fakeCSC{ucc: fakeUCC{}},
Expand Down Expand Up @@ -319,7 +343,7 @@ func TestTick(t *testing.T) {
cu, err := NewUpdater(
[]*issuance.Certificate{e1, r3},
2, 18*time.Hour, 24*time.Hour,
6*time.Hour, 1*time.Minute, 1, 1,
6*time.Hour, time.Minute, time.Hour, 1, 1,
&fakeSAC{grcc: fakeGRCC{err: errors.New("db no worky")}, maxNotAfter: clk.Now().Add(90 * 24 * time.Hour)},
&fakeCGC{gcc: fakeGCC{}},
&fakeCSC{ucc: fakeUCC{}},
Expand Down
5 changes: 3 additions & 2 deletions features/featureflag_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions features/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ const (
// According to the BRs Section 7.1.4.2.2(a), the commonName field is
// Deprecated, and its inclusion is discouraged but not (yet) prohibited.
RequireCommonName

// LeaseCRLShards causes the crl-updater to use the database to control which
// instance of crl-updater is responsible for updating each shard. This flag
// should only be enabled if the `crlShards` table exists in the database.
LeaseCRLShards
)

// List of features and their default value, protected by fMu
Expand All @@ -86,6 +91,7 @@ var features = map[FeatureFlag]bool{
CertCheckerRequiresValidations: false,
AsyncFinalize: false,
RequireCommonName: true,
LeaseCRLShards: false,

StoreLintingCertificateInsteadOfPrecertificate: false,
}
Expand Down
Loading

0 comments on commit 6e6037e

Please sign in to comment.