Skip to content

Commit

Permalink
Better prevention and handling of redis waitgroup key expiry issues
Browse files Browse the repository at this point in the history
  • Loading branch information
austonst committed Mar 18, 2024
1 parent 8396708 commit d668875
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 7 deletions.
24 changes: 19 additions & 5 deletions datastore/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,13 +819,18 @@ func (r *RedisCache) BeginProcessingSlot(ctx context.Context, slot uint64) (err
}

keyProcessingSlot := r.keyProcessingSlot(slot)
err = r.client.Incr(ctx, keyProcessingSlot).Err()

pipe := r.client.TxPipeline()
pipe.Incr(ctx, keyProcessingSlot)
pipe.Expire(ctx, keyProcessingSlot, expiryLock)
_, err = pipe.Exec(ctx)

if err != nil {
return err
}

r.currentSlot = slot
err = r.client.Expire(ctx, keyProcessingSlot, expiryLock).Err()
return err
return nil
}

// EndProcessingSlot signals that a builder process is done handling blocks for the current slot
Expand All @@ -836,9 +841,18 @@ func (r *RedisCache) EndProcessingSlot(ctx context.Context) (err error) {
}

keyProcessingSlot := r.keyProcessingSlot(r.currentSlot)
err = r.client.Decr(ctx, keyProcessingSlot).Err()

pipe := r.client.TxPipeline()
pipe.Decr(ctx, keyProcessingSlot)
pipe.Expire(ctx, keyProcessingSlot, expiryLock)
_, err = pipe.Exec(ctx)

if err != nil {
return err
}

r.currentSlot = 0
return err
return nil
}

// WaitForSlotComplete waits for a slot to be completed by all builder processes
Expand Down
5 changes: 3 additions & 2 deletions services/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,7 @@ func (api *RelayAPI) prepareBuildersForSlot(headSlot, prevHeadSlot uint64) {
// Now we release our lock and wait for all other builder processes to wrap up
err := api.redis.EndProcessingSlot(context.Background())
if err != nil {
api.log.WithError(err).Error("failed to update redis optimistic processing slot")
api.log.WithError(err).Error("failed to unlock redis optimistic processing slot")
}
err = api.redis.WaitForSlotComplete(context.Background(), prevHeadSlot+1)
if err != nil {
Expand All @@ -860,7 +860,8 @@ func (api *RelayAPI) prepareBuildersForSlot(headSlot, prevHeadSlot uint64) {
api.optimisticSlot.Store(headSlot + 1)
err = api.redis.BeginProcessingSlot(context.Background(), headSlot+1)
if err != nil {
api.log.WithError(err).Error("failed to update redis optimistic processing slot")
api.log.WithError(err).Error("failed to lock redis optimistic processing slot")
api.optimisticSlot.Store(0)
}

builders, err := api.db.GetBlockBuilders()
Expand Down

0 comments on commit d668875

Please sign in to comment.