Skip to content

Commit

Permalink
perf improvements for loading entities in unseal
Browse files Browse the repository at this point in the history
  • Loading branch information
miagilepner committed Jan 9, 2025
1 parent ab4e8da commit 048b760
Showing 1 changed file with 68 additions and 17 deletions.
85 changes: 68 additions & 17 deletions vault/identity_store_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

metrics "github.com/armon/go-metrics"
"github.com/golang/protobuf/ptypes"
"github.com/hashicorp/errwrap"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-secure-stdlib/strutil"
Expand Down Expand Up @@ -362,10 +363,10 @@ func (i *IdentityStore) loadEntities(ctx context.Context) error {
// create a slice of result channels, one for each bucket. We need each result
// and err chan to be 1 buffered so we can leave a result there even if the
// processing loop is blocking on an earlier bucket still.
results := make([]chan *storagepacker.Bucket, len(existing))
results := make([]chan []*identity.Entity, len(existing))
errs := make([]chan error, len(existing))
for j := range existing {
results[j] = make(chan *storagepacker.Bucket, 1)
results[j] = make(chan []*identity.Entity, 1)
errs[j] = make(chan error, 1)
}

Expand Down Expand Up @@ -393,8 +394,18 @@ func (i *IdentityStore) loadEntities(ctx context.Context) error {
continue
}

items := make([]*identity.Entity, len(bucket.Items))
for j, item := range bucket.Items {
entity, err := i.parseEntityFromBucketItem(ctx, item)
if err != nil {
errs[idx] <- err
continue
}
items[j] = entity
}

// Write results out to the result channel
results[idx] <- bucket
results[idx] <- items

// quit early
case <-quit:
Expand Down Expand Up @@ -422,6 +433,8 @@ func (i *IdentityStore) loadEntities(ctx context.Context) error {
close(broker)
}()

localAliasBuckets := make(map[string]*storagepacker.Bucket)

// Restore each key by pulling from the result chan
LOOP:
for j := range existing {
Expand All @@ -431,17 +444,15 @@ LOOP:
close(quit)
break LOOP

case bucket := <-results[j]:
case entities := <-results[j]:
// If there is no entry, nothing to restore
if bucket == nil {
if entities == nil {
continue
}

for _, item := range bucket.Items {
entity, err := i.parseEntityFromBucketItem(ctx, item)
if err != nil {
return err
}
tx := i.db.Txn(true)
upsertedItems := 0
for _, entity := range entities {
if entity == nil {
continue
}
Expand Down Expand Up @@ -486,22 +497,28 @@ LOOP:
}
}

localAliases, err := i.parseLocalAliases(entity.ID)
err = i.loadLocalAliasesForEntity(ctx, entity, localAliasBuckets)
if err != nil {
return fmt.Errorf("failed to load local aliases from storage: %v", err)
}
if localAliases != nil {
for _, alias := range localAliases.Aliases {
entity.UpsertAlias(alias)
}
}

toBeUpserted := 1 + len(entity.Aliases)
if upsertedItems+toBeUpserted > 1024 {
tx.Commit()
upsertedItems = 0
tx = i.db.Txn(true)
}
// Only update MemDB and don't hit the storage again
err = i.upsertEntity(nsCtx, entity, nil, false)
err = i.upsertEntityInTxn(nsCtx, tx, entity, nil, false)
if err != nil {
return fmt.Errorf("failed to update entity in MemDB: %w", err)
}
upsertedItems += toBeUpserted
}
if upsertedItems > 0 {
tx.Commit()
}
tx.Abort()
}
}

Expand All @@ -526,6 +543,40 @@ LOOP:
return nil
}

// loadLocalAliasesForEntity upserts local aliases into the entity by retrieving
// the local aliases from the cache (if present) or storage
func (i *IdentityStore) loadLocalAliasesForEntity(ctx context.Context, entity *identity.Entity, localAliasCache map[string]*storagepacker.Bucket) error {
bucketKey := i.localAliasPacker.BucketKey(entity.ID)
if len(bucketKey) == 0 {
return fmt.Errorf("no bucket key for ID %s", entity.ID)
}
bucket, ok := localAliasCache[bucketKey]
if !ok {
var err error
bucket, err = i.localAliasPacker.GetBucket(ctx, bucketKey)
if err != nil {
return fmt.Errorf("failed to load local alias bucket from storage: %v", err)
}
localAliasCache[bucketKey] = bucket
}
if bucket == nil {
return nil
}
for _, item := range bucket.Items {
if item.ID == entity.ID {
var localAliases identity.LocalAliases
err := ptypes.UnmarshalAny(item.Message, &localAliases)
if err != nil {
return fmt.Errorf("failed to unmarshal local alias: %v", err)
}
for _, alias := range localAliases.Aliases {
entity.UpsertAlias(alias)
}
}
}
return nil
}

// getAccessorsOnDuplicateAliases returns a list of accessors by checking aliases in
// the passed in list which belong to the same accessor(s)
func getAccessorsOnDuplicateAliases(aliases []*identity.Alias) []string {
Expand Down

0 comments on commit 048b760

Please sign in to comment.