Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(redis): rename tx to pipeliner #499

Merged
merged 1 commit into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 46 additions & 46 deletions datastore/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,13 @@ func (r *RedisCache) SetObj(key string, value any, expiration time.Duration) (er
}

// SetObjPipelined saves an object in the given Redis key on a Redis pipeline (JSON encoded)
func (r *RedisCache) SetObjPipelined(ctx context.Context, tx redis.Pipeliner, key string, value any, expiration time.Duration) (err error) {
func (r *RedisCache) SetObjPipelined(ctx context.Context, pipeliner redis.Pipeliner, key string, value any, expiration time.Duration) (err error) {
marshalledValue, err := json.Marshal(value)
if err != nil {
return err
}

return tx.Set(ctx, key, marshalledValue, expiration).Err()
return pipeliner.Set(ctx, key, marshalledValue, expiration).Err()
}

func (r *RedisCache) HSetObj(key, field string, value any, expiration time.Duration) (err error) {
Expand Down Expand Up @@ -291,9 +291,9 @@ func (r *RedisCache) CheckAndSetLastSlotAndHashDelivered(slot uint64, hash strin
return r.client.Watch(context.Background(), txf, r.keyLastSlotDelivered, r.keyLastHashDelivered)
}

func (r *RedisCache) GetLastSlotDelivered(ctx context.Context, tx redis.Pipeliner) (slot uint64, err error) {
c := tx.Get(ctx, r.keyLastSlotDelivered)
_, err = tx.Exec(ctx)
func (r *RedisCache) GetLastSlotDelivered(ctx context.Context, pipeliner redis.Pipeliner) (slot uint64, err error) {
c := pipeliner.Get(ctx, r.keyLastSlotDelivered)
_, err = pipeliner.Exec(ctx)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -358,13 +358,13 @@ func (r *RedisCache) GetBestBid(slot uint64, parentHash, proposerPubkey string)
return resp, err
}

func (r *RedisCache) SaveExecutionPayloadCapella(ctx context.Context, tx redis.Pipeliner, slot uint64, proposerPubkey, blockHash string, execPayload *capella.ExecutionPayload) (err error) {
func (r *RedisCache) SaveExecutionPayloadCapella(ctx context.Context, pipeliner redis.Pipeliner, slot uint64, proposerPubkey, blockHash string, execPayload *capella.ExecutionPayload) (err error) {
key := r.keyExecPayloadCapella(slot, proposerPubkey, blockHash)
b, err := execPayload.MarshalSSZ()
if err != nil {
return err
}
return tx.Set(ctx, key, b, expiryBidCache).Err()
return pipeliner.Set(ctx, key, b, expiryBidCache).Err()
}

func (r *RedisCache) GetExecutionPayloadCapella(slot uint64, proposerPubkey, blockHash string) (*common.VersionedExecutionPayload, error) {
Expand All @@ -388,9 +388,9 @@ func (r *RedisCache) GetExecutionPayloadCapella(slot uint64, proposerPubkey, blo
return resp, nil
}

func (r *RedisCache) SaveBidTrace(ctx context.Context, tx redis.Pipeliner, trace *common.BidTraceV2) (err error) {
func (r *RedisCache) SaveBidTrace(ctx context.Context, pipeliner redis.Pipeliner, trace *common.BidTraceV2) (err error) {
key := r.keyCacheBidTrace(trace.Slot, trace.ProposerPubkey.String(), trace.BlockHash.String())
return r.SetObjPipelined(ctx, tx, key, trace, expiryBidCache)
return r.SetObjPipelined(ctx, pipeliner, key, trace, expiryBidCache)
}

// GetBidTrace returns (trace, nil), or (nil, redis.Nil) if the trace does not exist
Expand All @@ -401,10 +401,10 @@ func (r *RedisCache) GetBidTrace(slot uint64, proposerPubkey, blockHash string)
return resp, err
}

func (r *RedisCache) GetBuilderLatestPayloadReceivedAt(ctx context.Context, tx redis.Pipeliner, slot uint64, builderPubkey, parentHash, proposerPubkey string) (int64, error) {
func (r *RedisCache) GetBuilderLatestPayloadReceivedAt(ctx context.Context, pipeliner redis.Pipeliner, slot uint64, builderPubkey, parentHash, proposerPubkey string) (int64, error) {
keyLatestBidsTime := r.keyBlockBuilderLatestBidsTime(slot, parentHash, proposerPubkey)
c := tx.HGet(context.Background(), keyLatestBidsTime, builderPubkey)
_, err := tx.Exec(ctx)
c := pipeliner.HGet(context.Background(), keyLatestBidsTime, builderPubkey)
_, err := pipeliner.Exec(ctx)
if errors.Is(err, redis.Nil) {
return 0, nil
} else if err != nil {
Expand All @@ -414,32 +414,32 @@ func (r *RedisCache) GetBuilderLatestPayloadReceivedAt(ctx context.Context, tx r
}

// SaveBuilderBid saves the latest bid by a specific builder. TODO: use transaction to make these writes atomic
func (r *RedisCache) SaveBuilderBid(ctx context.Context, tx redis.Pipeliner, slot uint64, parentHash, proposerPubkey, builderPubkey string, receivedAt time.Time, headerResp *common.GetHeaderResponse) (err error) {
func (r *RedisCache) SaveBuilderBid(ctx context.Context, pipeliner redis.Pipeliner, slot uint64, parentHash, proposerPubkey, builderPubkey string, receivedAt time.Time, headerResp *common.GetHeaderResponse) (err error) {
// save the actual bid
keyLatestBid := r.keyLatestBidByBuilder(slot, parentHash, proposerPubkey, builderPubkey)
err = r.SetObjPipelined(ctx, tx, keyLatestBid, headerResp, expiryBidCache)
err = r.SetObjPipelined(ctx, pipeliner, keyLatestBid, headerResp, expiryBidCache)
if err != nil {
return err
}

// set the time of the request
keyLatestBidsTime := r.keyBlockBuilderLatestBidsTime(slot, parentHash, proposerPubkey)
err = tx.HSet(ctx, keyLatestBidsTime, builderPubkey, receivedAt.UnixMilli()).Err()
err = pipeliner.HSet(ctx, keyLatestBidsTime, builderPubkey, receivedAt.UnixMilli()).Err()
if err != nil {
return err
}
err = tx.Expire(ctx, keyLatestBidsTime, expiryBidCache).Err()
err = pipeliner.Expire(ctx, keyLatestBidsTime, expiryBidCache).Err()
if err != nil {
return err
}

// set the value last, because that's iterated over when updating the best bid, and the payload has to be available
keyLatestBidsValue := r.keyBlockBuilderLatestBidsValue(slot, parentHash, proposerPubkey)
err = tx.HSet(ctx, keyLatestBidsValue, builderPubkey, headerResp.Value().String()).Err()
err = pipeliner.HSet(ctx, keyLatestBidsValue, builderPubkey, headerResp.Value().String()).Err()
if err != nil {
return err
}
return tx.Expire(ctx, keyLatestBidsValue, expiryBidCache).Err()
return pipeliner.Expire(ctx, keyLatestBidsValue, expiryBidCache).Err()
}

type SaveBidAndUpdateTopBidResponse struct {
Expand All @@ -458,19 +458,19 @@ type SaveBidAndUpdateTopBidResponse struct {
TimeUpdateFloor time.Duration
}

func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, tx redis.Pipeliner, trace *common.BidTraceV2, payload *common.BuilderSubmitBlockRequest, getPayloadResponse *common.GetPayloadResponse, getHeaderResponse *common.GetHeaderResponse, reqReceivedAt time.Time, isCancellationEnabled bool, floorValue *big.Int) (state SaveBidAndUpdateTopBidResponse, err error) {
func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, pipeliner redis.Pipeliner, trace *common.BidTraceV2, payload *common.BuilderSubmitBlockRequest, getPayloadResponse *common.GetPayloadResponse, getHeaderResponse *common.GetHeaderResponse, reqReceivedAt time.Time, isCancellationEnabled bool, floorValue *big.Int) (state SaveBidAndUpdateTopBidResponse, err error) {
var prevTime, nextTime time.Time
prevTime = time.Now()

// Load latest bids for a given slot+parent+proposer
builderBids, err := NewBuilderBidsFromRedis(ctx, r, tx, payload.Slot(), payload.ParentHash(), payload.ProposerPubkey())
builderBids, err := NewBuilderBidsFromRedis(ctx, r, pipeliner, payload.Slot(), payload.ParentHash(), payload.ProposerPubkey())
if err != nil {
return state, err
}

// Load floor value (if not passed in already)
if floorValue == nil {
floorValue, err = r.GetFloorBidValue(ctx, tx, payload.Slot(), payload.ParentHash(), payload.ProposerPubkey())
floorValue, err = r.GetFloorBidValue(ctx, pipeliner, payload.Slot(), payload.ParentHash(), payload.ProposerPubkey())
if err != nil {
return state, err
}
Expand Down Expand Up @@ -498,7 +498,7 @@ func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, tx redis.Pipeli
// Time to save things in Redis
//
// 1. Save the execution payload
err = r.SaveExecutionPayloadCapella(ctx, tx, payload.Slot(), payload.ProposerPubkey(), payload.BlockHash(), getPayloadResponse.Capella.Capella)
err = r.SaveExecutionPayloadCapella(ctx, pipeliner, payload.Slot(), payload.ProposerPubkey(), payload.BlockHash(), getPayloadResponse.Capella.Capella)
if err != nil {
return state, err
}
Expand All @@ -509,7 +509,7 @@ func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, tx redis.Pipeli
prevTime = nextTime

// 2. Save latest bid for this builder
err = r.SaveBuilderBid(ctx, tx, payload.Slot(), payload.ParentHash(), payload.ProposerPubkey(), payload.BuilderPubkey().String(), reqReceivedAt, getHeaderResponse)
err = r.SaveBuilderBid(ctx, pipeliner, payload.Slot(), payload.ParentHash(), payload.ProposerPubkey(), payload.BuilderPubkey().String(), reqReceivedAt, getHeaderResponse)
if err != nil {
return state, err
}
Expand All @@ -522,7 +522,7 @@ func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, tx redis.Pipeli
prevTime = nextTime

// 3. Save the bid trace
err = r.SaveBidTrace(ctx, tx, trace)
err = r.SaveBidTrace(ctx, pipeliner, trace)
if err != nil {
return state, err
}
Expand All @@ -538,7 +538,7 @@ func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, tx redis.Pipeli
return state, nil
}

state, err = r._updateTopBid(ctx, tx, state, builderBids, payload.Slot(), payload.ParentHash(), payload.ProposerPubkey(), floorValue)
state, err = r._updateTopBid(ctx, pipeliner, state, builderBids, payload.Slot(), payload.ParentHash(), payload.ProposerPubkey(), floorValue)
if err != nil {
return state, err
}
Expand All @@ -556,8 +556,8 @@ func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, tx redis.Pipeli
// Non-cancellable bid above floor should set new floor
keyBidSource := r.keyLatestBidByBuilder(payload.Slot(), payload.ParentHash(), payload.ProposerPubkey(), payload.BuilderPubkey().String())
keyFloorBid := r.keyFloorBid(payload.Slot(), payload.ParentHash(), payload.ProposerPubkey())
c := tx.Copy(ctx, keyBidSource, keyFloorBid, 0, true)
_, err = tx.Exec(ctx)
c := pipeliner.Copy(ctx, keyBidSource, keyFloorBid, 0, true)
_, err = pipeliner.Exec(ctx)
if err != nil {
return state, err
}
Expand All @@ -568,19 +568,19 @@ func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, tx redis.Pipeli
} else if wasCopied == 0 {
return state, fmt.Errorf("could not copy floor bid from %s to %s", keyBidSource, keyFloorBid) //nolint:goerr113
}
err = tx.Expire(ctx, keyFloorBid, expiryBidCache).Err()
err = pipeliner.Expire(ctx, keyFloorBid, expiryBidCache).Err()
if err != nil {
return state, err
}

keyFloorBidValue := r.keyFloorBidValue(payload.Slot(), payload.ParentHash(), payload.ProposerPubkey())
err = tx.Set(ctx, keyFloorBidValue, payload.Value().String(), expiryBidCache).Err()
err = pipeliner.Set(ctx, keyFloorBidValue, payload.Value().String(), expiryBidCache).Err()
if err != nil {
return state, err
}

// Execute setting the floor bid
_, err = tx.Exec(ctx)
_, err = pipeliner.Exec(ctx)

// Record time needed to update floor
nextTime = time.Now().UTC()
Expand All @@ -589,9 +589,9 @@ func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, tx redis.Pipeli
return state, err
}

func (r *RedisCache) _updateTopBid(ctx context.Context, tx redis.Pipeliner, state SaveBidAndUpdateTopBidResponse, builderBids *BuilderBids, slot uint64, parentHash, proposerPubkey string, floorValue *big.Int) (resp SaveBidAndUpdateTopBidResponse, err error) {
func (r *RedisCache) _updateTopBid(ctx context.Context, pipeliner redis.Pipeliner, state SaveBidAndUpdateTopBidResponse, builderBids *BuilderBids, slot uint64, parentHash, proposerPubkey string, floorValue *big.Int) (resp SaveBidAndUpdateTopBidResponse, err error) {
if builderBids == nil {
builderBids, err = NewBuilderBidsFromRedis(ctx, r, tx, slot, parentHash, proposerPubkey)
builderBids, err = NewBuilderBidsFromRedis(ctx, r, pipeliner, slot, parentHash, proposerPubkey)
if err != nil {
return state, err
}
Expand All @@ -603,7 +603,7 @@ func (r *RedisCache) _updateTopBid(ctx context.Context, tx redis.Pipeliner, stat

// Load floor value (if not passed in already)
if floorValue == nil {
floorValue, err = r.GetFloorBidValue(ctx, tx, slot, parentHash, proposerPubkey)
floorValue, err = r.GetFloorBidValue(ctx, pipeliner, slot, parentHash, proposerPubkey)
if err != nil {
return state, err
}
Expand All @@ -621,8 +621,8 @@ func (r *RedisCache) _updateTopBid(ctx context.Context, tx redis.Pipeliner, stat

// Copy winning bid to top bid cache
keyTopBid := r.keyCacheGetHeaderResponse(slot, parentHash, proposerPubkey)
c := tx.Copy(context.Background(), keyBidSource, keyTopBid, 0, true)
_, err = tx.Exec(ctx)
c := pipeliner.Copy(context.Background(), keyBidSource, keyTopBid, 0, true)
_, err = pipeliner.Exec(ctx)
if err != nil {
return state, err
}
Expand All @@ -632,7 +632,7 @@ func (r *RedisCache) _updateTopBid(ctx context.Context, tx redis.Pipeliner, stat
} else if wasCopied == 0 {
return state, fmt.Errorf("could not copy top bid from %s to %s", keyBidSource, keyTopBid) //nolint:goerr113
}
err = tx.Expire(context.Background(), keyTopBid, expiryBidCache).Err()
err = pipeliner.Expire(context.Background(), keyTopBid, expiryBidCache).Err()
if err != nil {
return state, err
}
Expand All @@ -641,20 +641,20 @@ func (r *RedisCache) _updateTopBid(ctx context.Context, tx redis.Pipeliner, stat

// 6. Finally, update the global top bid value
keyTopBidValue := r.keyTopBidValue(slot, parentHash, proposerPubkey)
err = tx.Set(context.Background(), keyTopBidValue, state.TopBidValue.String(), expiryBidCache).Err()
err = pipeliner.Set(context.Background(), keyTopBidValue, state.TopBidValue.String(), expiryBidCache).Err()
if err != nil {
return state, err
}

_, err = tx.Exec(ctx)
_, err = pipeliner.Exec(ctx)
return state, err
}

// GetTopBidValue gets the top bid value for a given slot+parent+proposer combination
func (r *RedisCache) GetTopBidValue(ctx context.Context, tx redis.Pipeliner, slot uint64, parentHash, proposerPubkey string) (topBidValue *big.Int, err error) {
func (r *RedisCache) GetTopBidValue(ctx context.Context, pipeliner redis.Pipeliner, slot uint64, parentHash, proposerPubkey string) (topBidValue *big.Int, err error) {
keyTopBidValue := r.keyTopBidValue(slot, parentHash, proposerPubkey)
c := tx.Get(ctx, keyTopBidValue)
_, err = tx.Exec(ctx)
c := pipeliner.Get(ctx, keyTopBidValue)
_, err = pipeliner.Exec(ctx)
if errors.Is(err, redis.Nil) {
return big.NewInt(0), nil
} else if err != nil {
Expand Down Expand Up @@ -685,7 +685,7 @@ func (r *RedisCache) GetBuilderLatestValue(slot uint64, parentHash, proposerPubk
}

// DelBuilderBid removes a builders most recent bid
func (r *RedisCache) DelBuilderBid(ctx context.Context, tx redis.Pipeliner, slot uint64, parentHash, proposerPubkey, builderPubkey string) (err error) {
func (r *RedisCache) DelBuilderBid(ctx context.Context, pipeliner redis.Pipeliner, slot uint64, parentHash, proposerPubkey, builderPubkey string) (err error) {
// delete the value
keyLatestValue := r.keyBlockBuilderLatestBidsValue(slot, parentHash, proposerPubkey)
err = r.client.HDel(ctx, keyLatestValue, builderPubkey).Err()
Expand All @@ -702,16 +702,16 @@ func (r *RedisCache) DelBuilderBid(ctx context.Context, tx redis.Pipeliner, slot

// update bids now to compute current top bid
state := SaveBidAndUpdateTopBidResponse{} //nolint:exhaustruct
_, err = r._updateTopBid(ctx, tx, state, nil, slot, parentHash, proposerPubkey, nil)
_, err = r._updateTopBid(ctx, pipeliner, state, nil, slot, parentHash, proposerPubkey, nil)
return err
}

// GetFloorBidValue returns the value of the highest non-cancellable bid
func (r *RedisCache) GetFloorBidValue(ctx context.Context, tx redis.Pipeliner, slot uint64, parentHash, proposerPubkey string) (floorValue *big.Int, err error) {
func (r *RedisCache) GetFloorBidValue(ctx context.Context, pipeliner redis.Pipeliner, slot uint64, parentHash, proposerPubkey string) (floorValue *big.Int, err error) {
keyFloorBidValue := r.keyFloorBidValue(slot, parentHash, proposerPubkey)
c := tx.Get(ctx, keyFloorBidValue)
c := pipeliner.Get(ctx, keyFloorBidValue)

_, err = tx.Exec(ctx)
_, err = pipeliner.Exec(ctx)
if errors.Is(err, redis.Nil) {
return big.NewInt(0), nil
} else if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions datastore/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,8 +491,8 @@ func TestGetBuilderLatestValue(t *testing.T) {
},
}

_, err = cache.client.TxPipelined(context.Background(), func(tx redis.Pipeliner) error {
return cache.SaveBuilderBid(context.Background(), tx, slot, parentHash, proposerPubkey, builderPubkey, time.Now().UTC(), getHeaderResp)
_, err = cache.client.TxPipelined(context.Background(), func(pipeliner redis.Pipeliner) error {
return cache.SaveBuilderBid(context.Background(), pipeliner, slot, parentHash, proposerPubkey, builderPubkey, time.Now().UTC(), getHeaderResp)
})
require.NoError(t, err)

Expand All @@ -518,7 +518,7 @@ func TestPipelineNilCheck(t *testing.T) {
// err := cache.client.Set(context.Background(), key1, val, 0).Err()
// require.NoError(t, err)

// _, err = cache.client.TxPipelined(context.Background(), func(tx redis.Pipeliner) error {
// _, err = cache.client.TxPipelined(context.Background(), func(pipeliner redis.Pipeliner) error {
// c := tx.Get(context.Background(), key1)
// _, err := tx.Exec(context.Background())
// require.NoError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions datastore/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ type BuilderBids struct {
bidValues map[string]*big.Int
}

func NewBuilderBidsFromRedis(ctx context.Context, r *RedisCache, tx redis.Pipeliner, slot uint64, parentHash, proposerPubkey string) (*BuilderBids, error) {
func NewBuilderBidsFromRedis(ctx context.Context, r *RedisCache, pipeliner redis.Pipeliner, slot uint64, parentHash, proposerPubkey string) (*BuilderBids, error) {
keyBidValues := r.keyBlockBuilderLatestBidsValue(slot, parentHash, proposerPubkey)
c := tx.HGetAll(ctx, keyBidValues)
_, err := tx.Exec(ctx)
c := pipeliner.HGetAll(ctx, keyBidValues)
_, err := pipeliner.Exec(ctx)
if err != nil && !errors.Is(err, redis.Nil) {
return nil, err
}
Expand Down
Loading