From 4d54559e3077056aeccee87564e0dfac49c14ffb Mon Sep 17 00:00:00 2001 From: Alexander Tesfamichael Date: Fri, 4 Aug 2023 12:20:14 +0200 Subject: [PATCH] refactor(redis): rename tx to pipeliner "transaction" is exactly what we shouldn't call the pipeliner. Redis offers both, the latter differs precisely in that it is not atomic. --- datastore/redis.go | 92 ++++++++++++++++++++--------------------- datastore/redis_test.go | 6 +-- datastore/utils.go | 6 +-- 3 files changed, 52 insertions(+), 52 deletions(-) diff --git a/datastore/redis.go b/datastore/redis.go index 9d9eeb2e..caec149a 100644 --- a/datastore/redis.go +++ b/datastore/redis.go @@ -206,13 +206,13 @@ func (r *RedisCache) SetObj(key string, value any, expiration time.Duration) (er } // SetObjPipelined saves an object in the given Redis key on a Redis pipeline (JSON encoded) -func (r *RedisCache) SetObjPipelined(ctx context.Context, tx redis.Pipeliner, key string, value any, expiration time.Duration) (err error) { +func (r *RedisCache) SetObjPipelined(ctx context.Context, pipeliner redis.Pipeliner, key string, value any, expiration time.Duration) (err error) { marshalledValue, err := json.Marshal(value) if err != nil { return err } - return tx.Set(ctx, key, marshalledValue, expiration).Err() + return pipeliner.Set(ctx, key, marshalledValue, expiration).Err() } func (r *RedisCache) HSetObj(key, field string, value any, expiration time.Duration) (err error) { @@ -291,9 +291,9 @@ func (r *RedisCache) CheckAndSetLastSlotAndHashDelivered(slot uint64, hash strin return r.client.Watch(context.Background(), txf, r.keyLastSlotDelivered, r.keyLastHashDelivered) } -func (r *RedisCache) GetLastSlotDelivered(ctx context.Context, tx redis.Pipeliner) (slot uint64, err error) { - c := tx.Get(ctx, r.keyLastSlotDelivered) - _, err = tx.Exec(ctx) +func (r *RedisCache) GetLastSlotDelivered(ctx context.Context, pipeliner redis.Pipeliner) (slot uint64, err error) { + c := pipeliner.Get(ctx, r.keyLastSlotDelivered) + _, err = pipeliner.Exec(ctx) if err != nil { return 0, err } @@ -358,13 +358,13 @@ func (r *RedisCache) GetBestBid(slot uint64, parentHash, proposerPubkey string) return resp, err } -func (r *RedisCache) SaveExecutionPayloadCapella(ctx context.Context, tx redis.Pipeliner, slot uint64, proposerPubkey, blockHash string, execPayload *capella.ExecutionPayload) (err error) { +func (r *RedisCache) SaveExecutionPayloadCapella(ctx context.Context, pipeliner redis.Pipeliner, slot uint64, proposerPubkey, blockHash string, execPayload *capella.ExecutionPayload) (err error) { key := r.keyExecPayloadCapella(slot, proposerPubkey, blockHash) b, err := execPayload.MarshalSSZ() if err != nil { return err } - return tx.Set(ctx, key, b, expiryBidCache).Err() + return pipeliner.Set(ctx, key, b, expiryBidCache).Err() } func (r *RedisCache) GetExecutionPayloadCapella(slot uint64, proposerPubkey, blockHash string) (*common.VersionedExecutionPayload, error) { @@ -388,9 +388,9 @@ func (r *RedisCache) GetExecutionPayloadCapella(slot uint64, proposerPubkey, blo return resp, nil } -func (r *RedisCache) SaveBidTrace(ctx context.Context, tx redis.Pipeliner, trace *common.BidTraceV2) (err error) { +func (r *RedisCache) SaveBidTrace(ctx context.Context, pipeliner redis.Pipeliner, trace *common.BidTraceV2) (err error) { key := r.keyCacheBidTrace(trace.Slot, trace.ProposerPubkey.String(), trace.BlockHash.String()) - return r.SetObjPipelined(ctx, tx, key, trace, expiryBidCache) + return r.SetObjPipelined(ctx, pipeliner, key, trace, expiryBidCache) } // GetBidTrace returns (trace, nil), or (nil, redis.Nil) if the trace does not exist @@ -401,10 +401,10 @@ func (r *RedisCache) GetBidTrace(slot uint64, proposerPubkey, blockHash string) return resp, err } -func (r *RedisCache) GetBuilderLatestPayloadReceivedAt(ctx context.Context, tx redis.Pipeliner, slot uint64, builderPubkey, parentHash, proposerPubkey string) (int64, error) { +func (r *RedisCache) GetBuilderLatestPayloadReceivedAt(ctx context.Context, pipeliner redis.Pipeliner, slot uint64, builderPubkey, parentHash, proposerPubkey string) (int64, error) { keyLatestBidsTime := r.keyBlockBuilderLatestBidsTime(slot, parentHash, proposerPubkey) - c := tx.HGet(context.Background(), keyLatestBidsTime, builderPubkey) - _, err := tx.Exec(ctx) + c := pipeliner.HGet(context.Background(), keyLatestBidsTime, builderPubkey) + _, err := pipeliner.Exec(ctx) if errors.Is(err, redis.Nil) { return 0, nil } else if err != nil { @@ -414,32 +414,32 @@ func (r *RedisCache) GetBuilderLatestPayloadReceivedAt(ctx context.Context, tx r } // SaveBuilderBid saves the latest bid by a specific builder. TODO: use transaction to make these writes atomic -func (r *RedisCache) SaveBuilderBid(ctx context.Context, tx redis.Pipeliner, slot uint64, parentHash, proposerPubkey, builderPubkey string, receivedAt time.Time, headerResp *common.GetHeaderResponse) (err error) { +func (r *RedisCache) SaveBuilderBid(ctx context.Context, pipeliner redis.Pipeliner, slot uint64, parentHash, proposerPubkey, builderPubkey string, receivedAt time.Time, headerResp *common.GetHeaderResponse) (err error) { // save the actual bid keyLatestBid := r.keyLatestBidByBuilder(slot, parentHash, proposerPubkey, builderPubkey) - err = r.SetObjPipelined(ctx, tx, keyLatestBid, headerResp, expiryBidCache) + err = r.SetObjPipelined(ctx, pipeliner, keyLatestBid, headerResp, expiryBidCache) if err != nil { return err } // set the time of the request keyLatestBidsTime := r.keyBlockBuilderLatestBidsTime(slot, parentHash, proposerPubkey) - err = tx.HSet(ctx, keyLatestBidsTime, builderPubkey, receivedAt.UnixMilli()).Err() + err = pipeliner.HSet(ctx, keyLatestBidsTime, builderPubkey, receivedAt.UnixMilli()).Err() if err != nil { return err } - err = tx.Expire(ctx, keyLatestBidsTime, expiryBidCache).Err() + err = pipeliner.Expire(ctx, keyLatestBidsTime, expiryBidCache).Err() if err != nil { return err } // set the value last, because that's iterated over when updating the best bid, and the payload has to be available keyLatestBidsValue := r.keyBlockBuilderLatestBidsValue(slot, parentHash, proposerPubkey) - err = tx.HSet(ctx, keyLatestBidsValue, builderPubkey, headerResp.Value().String()).Err() + err = pipeliner.HSet(ctx, keyLatestBidsValue, builderPubkey, headerResp.Value().String()).Err() if err != nil { return err } - return tx.Expire(ctx, keyLatestBidsValue, expiryBidCache).Err() + return pipeliner.Expire(ctx, keyLatestBidsValue, expiryBidCache).Err() } type SaveBidAndUpdateTopBidResponse struct { @@ -458,19 +458,19 @@ type SaveBidAndUpdateTopBidResponse struct { TimeUpdateFloor time.Duration } -func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, tx redis.Pipeliner, trace *common.BidTraceV2, payload *common.BuilderSubmitBlockRequest, getPayloadResponse *common.GetPayloadResponse, getHeaderResponse *common.GetHeaderResponse, reqReceivedAt time.Time, isCancellationEnabled bool, floorValue *big.Int) (state SaveBidAndUpdateTopBidResponse, err error) { +func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, pipeliner redis.Pipeliner, trace *common.BidTraceV2, payload *common.BuilderSubmitBlockRequest, getPayloadResponse *common.GetPayloadResponse, getHeaderResponse *common.GetHeaderResponse, reqReceivedAt time.Time, isCancellationEnabled bool, floorValue *big.Int) (state SaveBidAndUpdateTopBidResponse, err error) { var prevTime, nextTime time.Time prevTime = time.Now() // Load latest bids for a given slot+parent+proposer - builderBids, err := NewBuilderBidsFromRedis(ctx, r, tx, payload.Slot(), payload.ParentHash(), payload.ProposerPubkey()) + builderBids, err := NewBuilderBidsFromRedis(ctx, r, pipeliner, payload.Slot(), payload.ParentHash(), payload.ProposerPubkey()) if err != nil { return state, err } // Load floor value (if not passed in already) if floorValue == nil { - floorValue, err = r.GetFloorBidValue(ctx, tx, payload.Slot(), payload.ParentHash(), payload.ProposerPubkey()) + floorValue, err = r.GetFloorBidValue(ctx, pipeliner, payload.Slot(), payload.ParentHash(), payload.ProposerPubkey()) if err != nil { return state, err } @@ -498,7 +498,7 @@ func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, tx redis.Pipeli // Time to save things in Redis // // 1. Save the execution payload - err = r.SaveExecutionPayloadCapella(ctx, tx, payload.Slot(), payload.ProposerPubkey(), payload.BlockHash(), getPayloadResponse.Capella.Capella) + err = r.SaveExecutionPayloadCapella(ctx, pipeliner, payload.Slot(), payload.ProposerPubkey(), payload.BlockHash(), getPayloadResponse.Capella.Capella) if err != nil { return state, err } @@ -509,7 +509,7 @@ func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, tx redis.Pipeli prevTime = nextTime // 2. Save latest bid for this builder - err = r.SaveBuilderBid(ctx, tx, payload.Slot(), payload.ParentHash(), payload.ProposerPubkey(), payload.BuilderPubkey().String(), reqReceivedAt, getHeaderResponse) + err = r.SaveBuilderBid(ctx, pipeliner, payload.Slot(), payload.ParentHash(), payload.ProposerPubkey(), payload.BuilderPubkey().String(), reqReceivedAt, getHeaderResponse) if err != nil { return state, err } @@ -522,7 +522,7 @@ func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, tx redis.Pipeli prevTime = nextTime // 3. Save the bid trace - err = r.SaveBidTrace(ctx, tx, trace) + err = r.SaveBidTrace(ctx, pipeliner, trace) if err != nil { return state, err } @@ -538,7 +538,7 @@ func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, tx redis.Pipeli return state, nil } - state, err = r._updateTopBid(ctx, tx, state, builderBids, payload.Slot(), payload.ParentHash(), payload.ProposerPubkey(), floorValue) + state, err = r._updateTopBid(ctx, pipeliner, state, builderBids, payload.Slot(), payload.ParentHash(), payload.ProposerPubkey(), floorValue) if err != nil { return state, err } @@ -556,8 +556,8 @@ func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, tx redis.Pipeli // Non-cancellable bid above floor should set new floor keyBidSource := r.keyLatestBidByBuilder(payload.Slot(), payload.ParentHash(), payload.ProposerPubkey(), payload.BuilderPubkey().String()) keyFloorBid := r.keyFloorBid(payload.Slot(), payload.ParentHash(), payload.ProposerPubkey()) - c := tx.Copy(ctx, keyBidSource, keyFloorBid, 0, true) - _, err = tx.Exec(ctx) + c := pipeliner.Copy(ctx, keyBidSource, keyFloorBid, 0, true) + _, err = pipeliner.Exec(ctx) if err != nil { return state, err } @@ -568,19 +568,19 @@ func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, tx redis.Pipeli } else if wasCopied == 0 { return state, fmt.Errorf("could not copy floor bid from %s to %s", keyBidSource, keyFloorBid) //nolint:goerr113 } - err = tx.Expire(ctx, keyFloorBid, expiryBidCache).Err() + err = pipeliner.Expire(ctx, keyFloorBid, expiryBidCache).Err() if err != nil { return state, err } keyFloorBidValue := r.keyFloorBidValue(payload.Slot(), payload.ParentHash(), payload.ProposerPubkey()) - err = tx.Set(ctx, keyFloorBidValue, payload.Value().String(), expiryBidCache).Err() + err = pipeliner.Set(ctx, keyFloorBidValue, payload.Value().String(), expiryBidCache).Err() if err != nil { return state, err } // Execute setting the floor bid - _, err = tx.Exec(ctx) + _, err = pipeliner.Exec(ctx) // Record time needed to update floor nextTime = time.Now().UTC() @@ -589,9 +589,9 @@ func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, tx redis.Pipeli return state, err } -func (r *RedisCache) _updateTopBid(ctx context.Context, tx redis.Pipeliner, state SaveBidAndUpdateTopBidResponse, builderBids *BuilderBids, slot uint64, parentHash, proposerPubkey string, floorValue *big.Int) (resp SaveBidAndUpdateTopBidResponse, err error) { +func (r *RedisCache) _updateTopBid(ctx context.Context, pipeliner redis.Pipeliner, state SaveBidAndUpdateTopBidResponse, builderBids *BuilderBids, slot uint64, parentHash, proposerPubkey string, floorValue *big.Int) (resp SaveBidAndUpdateTopBidResponse, err error) { if builderBids == nil { - builderBids, err = NewBuilderBidsFromRedis(ctx, r, tx, slot, parentHash, proposerPubkey) + builderBids, err = NewBuilderBidsFromRedis(ctx, r, pipeliner, slot, parentHash, proposerPubkey) if err != nil { return state, err } @@ -603,7 +603,7 @@ func (r *RedisCache) _updateTopBid(ctx context.Context, tx redis.Pipeliner, stat // Load floor value (if not passed in already) if floorValue == nil { - floorValue, err = r.GetFloorBidValue(ctx, tx, slot, parentHash, proposerPubkey) + floorValue, err = r.GetFloorBidValue(ctx, pipeliner, slot, parentHash, proposerPubkey) if err != nil { return state, err } @@ -621,8 +621,8 @@ func (r *RedisCache) _updateTopBid(ctx context.Context, tx redis.Pipeliner, stat // Copy winning bid to top bid cache keyTopBid := r.keyCacheGetHeaderResponse(slot, parentHash, proposerPubkey) - c := tx.Copy(context.Background(), keyBidSource, keyTopBid, 0, true) - _, err = tx.Exec(ctx) + c := pipeliner.Copy(context.Background(), keyBidSource, keyTopBid, 0, true) + _, err = pipeliner.Exec(ctx) if err != nil { return state, err } @@ -632,7 +632,7 @@ func (r *RedisCache) _updateTopBid(ctx context.Context, tx redis.Pipeliner, stat } else if wasCopied == 0 { return state, fmt.Errorf("could not copy top bid from %s to %s", keyBidSource, keyTopBid) //nolint:goerr113 } - err = tx.Expire(context.Background(), keyTopBid, expiryBidCache).Err() + err = pipeliner.Expire(context.Background(), keyTopBid, expiryBidCache).Err() if err != nil { return state, err } @@ -641,20 +641,20 @@ func (r *RedisCache) _updateTopBid(ctx context.Context, tx redis.Pipeliner, stat // 6. Finally, update the global top bid value keyTopBidValue := r.keyTopBidValue(slot, parentHash, proposerPubkey) - err = tx.Set(context.Background(), keyTopBidValue, state.TopBidValue.String(), expiryBidCache).Err() + err = pipeliner.Set(context.Background(), keyTopBidValue, state.TopBidValue.String(), expiryBidCache).Err() if err != nil { return state, err } - _, err = tx.Exec(ctx) + _, err = pipeliner.Exec(ctx) return state, err } // GetTopBidValue gets the top bid value for a given slot+parent+proposer combination -func (r *RedisCache) GetTopBidValue(ctx context.Context, tx redis.Pipeliner, slot uint64, parentHash, proposerPubkey string) (topBidValue *big.Int, err error) { +func (r *RedisCache) GetTopBidValue(ctx context.Context, pipeliner redis.Pipeliner, slot uint64, parentHash, proposerPubkey string) (topBidValue *big.Int, err error) { keyTopBidValue := r.keyTopBidValue(slot, parentHash, proposerPubkey) - c := tx.Get(ctx, keyTopBidValue) - _, err = tx.Exec(ctx) + c := pipeliner.Get(ctx, keyTopBidValue) + _, err = pipeliner.Exec(ctx) if errors.Is(err, redis.Nil) { return big.NewInt(0), nil } else if err != nil { @@ -685,7 +685,7 @@ func (r *RedisCache) GetBuilderLatestValue(slot uint64, parentHash, proposerPubk } // DelBuilderBid removes a builders most recent bid -func (r *RedisCache) DelBuilderBid(ctx context.Context, tx redis.Pipeliner, slot uint64, parentHash, proposerPubkey, builderPubkey string) (err error) { +func (r *RedisCache) DelBuilderBid(ctx context.Context, pipeliner redis.Pipeliner, slot uint64, parentHash, proposerPubkey, builderPubkey string) (err error) { // delete the value keyLatestValue := r.keyBlockBuilderLatestBidsValue(slot, parentHash, proposerPubkey) err = r.client.HDel(ctx, keyLatestValue, builderPubkey).Err() @@ -702,16 +702,16 @@ func (r *RedisCache) DelBuilderBid(ctx context.Context, tx redis.Pipeliner, slot // update bids now to compute current top bid state := SaveBidAndUpdateTopBidResponse{} //nolint:exhaustruct - _, err = r._updateTopBid(ctx, tx, state, nil, slot, parentHash, proposerPubkey, nil) + _, err = r._updateTopBid(ctx, pipeliner, state, nil, slot, parentHash, proposerPubkey, nil) return err } // GetFloorBidValue returns the value of the highest non-cancellable bid -func (r *RedisCache) GetFloorBidValue(ctx context.Context, tx redis.Pipeliner, slot uint64, parentHash, proposerPubkey string) (floorValue *big.Int, err error) { +func (r *RedisCache) GetFloorBidValue(ctx context.Context, pipeliner redis.Pipeliner, slot uint64, parentHash, proposerPubkey string) (floorValue *big.Int, err error) { keyFloorBidValue := r.keyFloorBidValue(slot, parentHash, proposerPubkey) - c := tx.Get(ctx, keyFloorBidValue) + c := pipeliner.Get(ctx, keyFloorBidValue) - _, err = tx.Exec(ctx) + _, err = pipeliner.Exec(ctx) if errors.Is(err, redis.Nil) { return big.NewInt(0), nil } else if err != nil { diff --git a/datastore/redis_test.go b/datastore/redis_test.go index b2ec2124..679085fe 100644 --- a/datastore/redis_test.go +++ b/datastore/redis_test.go @@ -491,8 +491,8 @@ func TestGetBuilderLatestValue(t *testing.T) { }, } - _, err = cache.client.TxPipelined(context.Background(), func(tx redis.Pipeliner) error { - return cache.SaveBuilderBid(context.Background(), tx, slot, parentHash, proposerPubkey, builderPubkey, time.Now().UTC(), getHeaderResp) + _, err = cache.client.TxPipelined(context.Background(), func(pipeliner redis.Pipeliner) error { + return cache.SaveBuilderBid(context.Background(), pipeliner, slot, parentHash, proposerPubkey, builderPubkey, time.Now().UTC(), getHeaderResp) }) require.NoError(t, err) @@ -518,7 +518,7 @@ func TestPipelineNilCheck(t *testing.T) { // err := cache.client.Set(context.Background(), key1, val, 0).Err() // require.NoError(t, err) -// _, err = cache.client.TxPipelined(context.Background(), func(tx redis.Pipeliner) error { +// _, err = cache.client.TxPipelined(context.Background(), func(pipeliner redis.Pipeliner) error { // c := tx.Get(context.Background(), key1) // _, err := tx.Exec(context.Background()) // require.NoError(t, err) diff --git a/datastore/utils.go b/datastore/utils.go index 66af6e2f..d9507208 100644 --- a/datastore/utils.go +++ b/datastore/utils.go @@ -13,10 +13,10 @@ type BuilderBids struct { bidValues map[string]*big.Int } -func NewBuilderBidsFromRedis(ctx context.Context, r *RedisCache, tx redis.Pipeliner, slot uint64, parentHash, proposerPubkey string) (*BuilderBids, error) { +func NewBuilderBidsFromRedis(ctx context.Context, r *RedisCache, pipeliner redis.Pipeliner, slot uint64, parentHash, proposerPubkey string) (*BuilderBids, error) { keyBidValues := r.keyBlockBuilderLatestBidsValue(slot, parentHash, proposerPubkey) - c := tx.HGetAll(ctx, keyBidValues) - _, err := tx.Exec(ctx) + c := pipeliner.HGetAll(ctx, keyBidValues) + _, err := pipeliner.Exec(ctx) if err != nil && !errors.Is(err, redis.Nil) { return nil, err }