Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Greatly simplify crl-updater's batch mode #7079

Merged
merged 6 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/crl-updater/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func main() {
go cmd.CatchSignals(cancel)

if *runOnce {
err = u.RunOnce(ctx, clk.Now())
err = u.RunOnce(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
cmd.FailOnError(err, "")
}
Expand Down
6 changes: 3 additions & 3 deletions crl/crl.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ func Number(thisUpdate time.Time) number {
type id string

// Id is a utility function which constructs a new `id`.
func Id(issuerID issuance.IssuerNameID, crlNumber number, shardIdx int) id {
func Id(issuerID issuance.IssuerNameID, shardIdx int, crlNumber number) id {
type info struct {
IssuerID issuance.IssuerNameID `json:"issuerID"`
CRLNumber number `json:"crlNumber"`
ShardIdx int `json:"shardIdx"`
CRLNumber number `json:"crlNumber"`
}
jsonBytes, err := json.Marshal(info{issuerID, crlNumber, shardIdx})
jsonBytes, err := json.Marshal(info{issuerID, shardIdx, crlNumber})
if err != nil {
panic(err)
}
Expand Down
4 changes: 2 additions & 2 deletions crl/crl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

func TestId(t *testing.T) {
thisUpdate := time.Now()
out := Id(1337, Number(thisUpdate), 1)
expectCRLId := fmt.Sprintf("{\"issuerID\":1337,\"crlNumber\":%d,\"shardIdx\":1}", big.NewInt(thisUpdate.UnixNano()))
out := Id(1337, 1, Number(thisUpdate))
expectCRLId := fmt.Sprintf("{\"issuerID\":1337,\"shardIdx\":1,\"crlNumber\":%d}", big.NewInt(thisUpdate.UnixNano()))
test.AssertEquals(t, string(out), expectCRLId)
}
2 changes: 1 addition & 1 deletion crl/storer/storer.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (cs *crlStorer) UploadCRL(stream cspb.CRLStorer_UploadCRLServer) error {
return errors.New("got no metadata message")
}

crlId := crl.Id(issuer.NameID(), crlNumber, int(shardIdx))
crlId := crl.Id(issuer.NameID(), int(shardIdx), crlNumber)

cs.sizeHistogram.WithLabelValues(issuer.Subject.CommonName).Observe(float64(len(crlBytes)))

Expand Down
266 changes: 42 additions & 224 deletions crl/updater/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,254 +3,72 @@ package updater
import (
"context"
"errors"
"fmt"
"math"
"math/big"
"sort"
"strings"
"time"

"google.golang.org/protobuf/types/known/emptypb"
"sync"

"github.com/letsencrypt/boulder/crl"
"github.com/letsencrypt/boulder/issuance"
)

// RunOnce runs the entire update process once immediately. It processes each
// configured issuer serially, and processes all of them even if an early one
// encounters an error. All errors encountered are returned as a single combined
// error at the end.
func (cu *crlUpdater) RunOnce(ctx context.Context, atTime time.Time) (err error) {
defer func() {
// This func closes over the named return value `err`, so can reference it.
result := "success"
if err != nil {
result = "failed"
}
cu.tickHistogram.WithLabelValues("all", result).Observe(cu.clk.Since(atTime).Seconds())
}()
cu.log.Debugf("Ticking at time %s", atTime)

var errIssuers []string
for id := range cu.issuers {
// For now, process each issuer serially. This keeps the worker pool system
// simple, and processing all of the issuers in parallel likely wouldn't
// meaningfully speed up the overall process.
err := cu.updateIssuer(ctx, atTime, id)
if err != nil {
cu.log.AuditErrf(
"Generating CRLs for issuer failed: number=[%d] issuer=[%s] err=[%s]",
(*big.Int)(crl.Number(atTime)), cu.issuers[id].Subject.CommonName, err)
errIssuers = append(errIssuers, cu.issuers[id].Subject.CommonName)
}
}
// RunOnce causes the crlUpdater to update every shard immediately, then exit.
// It will run as many simultaneous goroutines as the configured maxParallelism.
func (cu *crlUpdater) RunOnce(ctx context.Context) error {
aarongable marked this conversation as resolved.
Show resolved Hide resolved
var wg sync.WaitGroup
atTime := cu.clk.Now()

if len(errIssuers) != 0 {
return fmt.Errorf("%d issuers failed: %v", len(errIssuers), strings.Join(errIssuers, ", "))
type workItem struct {
issuerNameID issuance.IssuerNameID
shardIdx int
}
return nil
}

// updateIssuer performs the full CRL issuance cycle for a single issuer cert. It
// processes all of the shards of this issuer's CRL concurrently, and processes
// all of them even if an early one encounters an error. All errors encountered
// are returned as a single combined error at the end.
func (cu *crlUpdater) updateIssuer(ctx context.Context, atTime time.Time, issuerNameID issuance.IssuerNameID) (err error) {
start := cu.clk.Now()
defer func() {
// This func closes over the named return value `err`, so can reference it.
result := "success"
if err != nil {
result = "failed"
}
cu.tickHistogram.WithLabelValues(cu.issuers[issuerNameID].Subject.CommonName+" (Overall)", result).Observe(cu.clk.Since(start).Seconds())
}()
cu.log.Debugf("Ticking issuer %d at time %s", issuerNameID, atTime)
var anyErr bool
var once sync.Once

shardMap, err := cu.getShardMappings(ctx, atTime)
if err != nil {
return fmt.Errorf("computing shardmap: %w", err)
}

type shardResult struct {
shardIdx int
err error
}
shardWorker := func(in <-chan workItem) {
defer wg.Done()

shardWorker := func(in <-chan int, out chan<- shardResult) {
for idx := range in {
for {
select {
case <-ctx.Done():
return
default:
out <- shardResult{
shardIdx: idx,
err: cu.updateShardWithRetry(ctx, atTime, issuerNameID, idx, shardMap[idx]),
case work, ok := <-in:
if !ok {
return
}
// We want to renumber shard 0 to also be shard numShards (e.g. 128).
// To facilitate that transition, produce the same CRL with both shard
// indices.
// TODO(#7007): Collapse this when we don't need to produce both anymore.
if idx == 0 {
out <- shardResult{
shardIdx: cu.numShards,
err: cu.updateShardWithRetry(ctx, atTime, issuerNameID, cu.numShards, shardMap[idx]),
}
err := cu.updateShardWithRetry(ctx, atTime, work.issuerNameID, work.shardIdx, nil)
if err != nil {
cu.log.AuditErrf(
"Generating CRL failed: id=[%s] err=[%s]",
crl.Id(work.issuerNameID, work.shardIdx, crl.Number(atTime)), err)
once.Do(func() { anyErr = true })
}
}
}
}

shardIdxs := make(chan int, cu.numShards)
shardResults := make(chan shardResult, cu.numShards)
for i := 0; i < cu.maxParallelism; i++ {
go shardWorker(shardIdxs, shardResults)
}
inputs := make(chan workItem)

// TODO(#7007): Iterate from 1 to numShards instead of 0 to numShards-1.
for shardIdx := 0; shardIdx < cu.numShards; shardIdx++ {
shardIdxs <- shardIdx
}
close(shardIdxs)

var errShards []int
// TODO(#7007): Reduce this to cu.numShards when we stop producing shard 0.
for i := 0; i < cu.numShards+1; i++ {
res := <-shardResults
if res.err != nil {
cu.log.AuditErrf(
"Generating CRL failed: id=[%s] err=[%s]",
crl.Id(issuerNameID, crl.Number(atTime), res.shardIdx), res.err)
errShards = append(errShards, res.shardIdx)
}
}

if len(errShards) != 0 {
sort.Ints(errShards)
return fmt.Errorf("%d shards failed: %v", len(errShards), errShards)
}
return nil
}

// anchorTime is used as a universal starting point against which other times
// can be compared. This time must be less than 290 years (2^63-1 nanoseconds)
// in the past, to ensure that Go's time.Duration can represent that difference.
// The significance of 2015-06-04 11:04:38 UTC is left as an exercise to the
// reader.
func anchorTime() time.Time {
return time.Date(2015, time.June, 04, 11, 04, 38, 0, time.UTC)
}

// chunk represents a fixed slice of time during which some certificates
// presumably expired or will expire. Its non-unique index indicates which shard
// it will be mapped to. The start boundary is inclusive, the end boundary is
// exclusive.
type chunk struct {
start time.Time
end time.Time
idx int
}

// shardMap is a mapping of shard indices to the set of chunks which should be
// included in that shard. Under most circumstances there is a one-to-one
// mapping, but certain configuration (such as having very narrow shards, or
// having a very long lookback period) can result in more than one chunk being
// mapped to a single shard.
type shardMap [][]chunk

// getShardMappings determines which chunks are currently relevant, based on
// the current time, the configured lookbackPeriod, and the farthest-future
// certificate expiration in the database. It then maps all of those chunks to
// their corresponding shards, and returns that mapping.
//
// The idea here is that shards should be stable. Picture a timeline, divided
// into chunks. Number those chunks from 0 (starting at the anchor time) up to
// numShards, then repeat the cycle when you run out of numbers:
//
// chunk: 0 1 2 3 4 0 1 2 3 4 0
// |-----|-----|-----|-----|-----|-----|-----|-----|-----|-----|-----...
// ^-anchorTime
//
// The total time window we care about goes from atTime-lookbackPeriod, forward
// through the time of the farthest-future notAfter date found in the database.
// The lookbackPeriod must be larger than the updatePeriod, to ensure that any
// certificates which were both revoked *and* expired since the last time we
// issued CRLs get included in this generation. Because these times are likely
// to fall in the middle of chunks, we include the whole chunks surrounding
// those times in our output CRLs:
//
// included chunk: 4 0 1 2 3 4 0 1
// ...--|-----|-----|-----|-----|-----|-----|-----|-----|-----|-----...
// atTime-lookbackPeriod-^ ^-atTime lastExpiry-^
//
// Because this total period of time may include multiple chunks with the same
// number, we then coalesce these chunks into a single shard. Ideally, this
// will never happen: it should only happen if the lookbackPeriod is very
// large, or if the shardWidth is small compared to the lastExpiry (such that
// numShards * shardWidth is less than lastExpiry - atTime). In this example,
// shards 0, 1, and 4 all get the contents of two chunks mapped to them, while
// shards 2 and 3 get only one chunk each.
//
// included chunk: 4 0 1 2 3 4 0 1
// ...--|-----|-----|-----|-----|-----|-----|-----|-----|-----|-----...
// │ │ │ │ │ │ │ │
// shard 0: <────────────────┘─────────────────────────────┘ │
// shard 1: <──────────────────────┘─────────────────────────────┘
// shard 2: <────────────────────────────┘ │ │
// shard 3: <──────────────────────────────────┘ │
// shard 4: <──────────┘─────────────────────────────┘
//
// Under this scheme, the shard to which any given certificate will be mapped is
// a function of only three things: that certificate's notAfter timestamp, the
// chunk width, and the number of shards.
func (cu *crlUpdater) getShardMappings(ctx context.Context, atTime time.Time) (shardMap, error) {
res := make(shardMap, cu.numShards)

// Get the farthest-future expiration timestamp to ensure we cover everything.
lastExpiry, err := cu.sa.GetMaxExpiration(ctx, &emptypb.Empty{})
if err != nil {
return nil, err
}

// Find the id number and boundaries of the earliest chunk we care about.
first := atTime.Add(-cu.lookbackPeriod)
c, err := cu.getChunkAtTime(first)
if err != nil {
return nil, err
for i := 0; i < cu.maxParallelism; i++ {
wg.Add(1)
go shardWorker(inputs)
}

// Iterate over chunks until we get completely beyond the farthest-future
// expiration.
for c.start.Before(lastExpiry.AsTime()) {
res[c.idx] = append(res[c.idx], c)
c = chunk{
start: c.end,
end: c.end.Add(cu.shardWidth),
idx: (c.idx + 1) % cu.numShards,
for _, issuer := range cu.issuers {
// TODO(#7007): Start at index 1 when we stop producing shard 0.
for i := 0; i <= cu.numShards; i++ {
select {
case <-ctx.Done():
close(inputs)
wg.Wait()
return ctx.Err()
case inputs <- workItem{issuerNameID: issuer.NameID(), shardIdx: i}:
}
}
}
close(inputs)

return res, nil
}

// getChunkAtTime returns the chunk whose boundaries contain the given time.
// It is broken out solely for the purpose of unit testing.
func (cu *crlUpdater) getChunkAtTime(atTime time.Time) (chunk, error) {
// Compute the amount of time between the current time and the anchor time.
timeSinceAnchor := atTime.Sub(anchorTime())
if timeSinceAnchor == time.Duration(math.MaxInt64) || timeSinceAnchor < 0 {
return chunk{}, errors.New("shard boundary math broken: anchor time too far away")
wg.Wait()
if anyErr {
return errors.New("one or more errors encountered, see logs")
}

// Determine how many full chunks fit within that time, and from that the
// index number of the desired chunk.
chunksSinceAnchor := timeSinceAnchor.Nanoseconds() / cu.shardWidth.Nanoseconds()
chunkIdx := int(chunksSinceAnchor) % cu.numShards

// Determine the boundaries of the chunk.
timeSinceChunk := time.Duration(timeSinceAnchor.Nanoseconds() % cu.shardWidth.Nanoseconds())
left := atTime.Add(-timeSinceChunk)
right := left.Add(cu.shardWidth)

return chunk{left, right, chunkIdx}, nil
return ctx.Err()
}
Loading