Skip to content
6 changes: 6 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1718,6 +1718,12 @@ func splitReceiptsAndDeriveFields(receipts rlp.RawValue, number uint64, hash com
return nil, nil
}

// After the state-sync HF, no need to split receipts as all receipts for a block
// are stored together (i.e. under same key).
if borCfg.IsStateSync(big.NewInt(int64(number))) {
return receipts, nil
}

// Bor receipts can only exist on sprint end blocks. Avoid decoding if possible.
if !types.IsSprintEndBlock(borCfg, number) {
return receipts, nil
Expand Down
11 changes: 8 additions & 3 deletions eth/downloader/bor_fetchers_concurrent_receipts.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,15 @@ func (q *receiptQueue) request(peer *peerConnection, req *fetchRequest, resCh ch
// deliver is responsible for taking a generic response packet from the concurrent
// fetcher, unpacking the receipt data and delivering it to the downloader's queue.
func (q *receiptQueue) deliver(peer *peerConnection, packet *eth.Response) (int, error) {
receipts := *packet.Res.(*eth.ReceiptsRLPResponse)
hashes := packet.Meta.([]common.Hash) // {receipt hashes}
// We're expecting a full decoded receipt list instead of encoded receipts for storage
// we used to have earlier.
receipts, getReceiptListHashes := eth.EncodeReceiptsAndPrepareHasher(packet.Res, q.queue.borConfig)
if receipts == nil || getReceiptListHashes == nil {
peer.log.Warn("Unknown receipt list type, discarding packet")
return 0, nil
}

accepted, err := q.queue.DeliverReceipts(peer.id, receipts, hashes)
accepted, err := q.queue.DeliverReceipts(peer.id, receipts, getReceiptListHashes)

switch {
case err == nil && len(receipts) == 0:
Expand Down
6 changes: 4 additions & 2 deletions eth/downloader/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package downloader
import (
"errors"
"fmt"
"math/big"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -1034,12 +1035,13 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, txListH
// DeliverReceipts injects a receipt retrieval response into the results queue.
// The method returns the number of transaction receipts accepted from the delivery
// and also wakes any threads waiting for data delivery.
func (q *queue) DeliverReceipts(id string, receiptList []rlp.RawValue, receiptListHashes []common.Hash) (int, error) {
func (q *queue) DeliverReceipts(id string, receiptList []rlp.RawValue, getReceiptListHash func(int, *big.Int) common.Hash) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()

validate := func(index int, header *types.Header) error {
if receiptListHashes[index] != header.ReceiptHash {
receiptListHash := getReceiptListHash(index, header.Number)
if receiptListHash != header.ReceiptHash {
return errInvalidReceipt
}

Expand Down
9 changes: 4 additions & 5 deletions eth/downloader/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,12 +470,11 @@ func XTestDelivery(t *testing.T) {
}

hasher := trie.NewStackTrie(nil)
hashes := make([]common.Hash, len(rcs))

for i, receipt := range rcs {
hashes[i] = types.DeriveSha(receipt, hasher)
getHashes := func(index int, number *big.Int) common.Hash {
return types.DeriveSha(rcs[index], hasher)
}
_, err := q.DeliverReceipts(peer.id, types.EncodeBlockReceiptLists(rcs), hashes)

_, err := q.DeliverReceipts(peer.id, types.EncodeBlockReceiptLists(rcs), getHashes)
if err != nil {
fmt.Printf("delivered %d receipts %v\n", len(rcs), err)
}
Expand Down
133 changes: 95 additions & 38 deletions eth/protocols/eth/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package eth
import (
"encoding/json"
"fmt"
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/tracker"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
)
Expand Down Expand Up @@ -336,18 +339,49 @@ func ServiceGetReceiptsQuery69(chain *core.BlockChain, query GetReceiptsRequest)
bytes int
receipts []rlp.RawValue
)
borCfg := chain.Config().Bor
for lookups, hash := range query {
if bytes >= softResponseLimit || len(receipts) >= maxReceiptsServe ||
lookups >= 2*maxReceiptsServe {
break
}

// In order to include state-sync transaction receipts, which resides in a separate
// table in db, we need to separately fetch normal and state-sync transaction receipts.
// Later, decode them, merge them into a single unit and re-encode the final list to
// be sent over p2p.
number := rawdb.ReadHeaderNumber(chain.DB(), hash)
if number == nil {
continue
}

// If we're past the state-sync hardfork, state-sync receipts (if present) are stored
// with normal block receipts so no special handling needed.
if borCfg != nil && borCfg.IsStateSync(big.NewInt(int64(*number))) {
allReceipts := chain.GetReceiptsRLP(hash)
if allReceipts == nil {
if header := chain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
continue
}
}
body := chain.GetBodyRLP(hash)
if body == nil {
continue
}
// Noop as no special handling is needed
isStateSyncReceipt := func(index int) bool {
return false
}
results, err := blockReceiptsToNetwork69(allReceipts, body, isStateSyncReceipt)
if err != nil {
log.Error("Error in block receipts conversion", "hash", hash, "err", err)
continue
}

receipts = append(receipts, results)
bytes += len(results)
continue
}

// Fetch receipts of normal evm transactions
// Before state-sync HF, we need to fetch state-sync receipts separately along with fetching
// block receipts. Upon fetching, decode them, merge them into a single unit and re-encode
// the final list to be sent over p2p.
normalReceipts := chain.GetReceiptsRLP(hash)
var normalReceiptsDecoded []*types.ReceiptForStorage
if normalReceipts != nil {
Expand All @@ -374,10 +408,6 @@ func ServiceGetReceiptsQuery69(chain *core.BlockChain, query GetReceiptsRequest)
if header := chain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
continue
}

// Append an empty entry for this block and continue
receipts = append(receipts, nil)
continue
}

// Track existence of bor receipts for encoding
Expand Down Expand Up @@ -534,50 +564,77 @@ func handleReceipts[L ReceiptsList](backend Backend, msg Decoder, peer *Peer) er
return err
}

// The response in p2p packet can only be consumed once. As we need a copy of receipt
// to exclude state-sync transaction receipt from receipt root calculation, encode
// and decode the response back.
packet, err := rlp.EncodeToBytes(res)
if err != nil {
return fmt.Errorf("failed to re-encode receipt packet for making copy: %w", err)
}

resWithoutStateSync := new(ReceiptsPacket[L])
if err := rlp.DecodeBytes(packet, resWithoutStateSync); err != nil {
return fmt.Errorf("failed to decode re-encoded receipt packet for making copy: %w", err)
}

// Assign temporary hashing buffer to each list item, the same buffer is shared
// between all receipt list instances.
buffers := new(receiptListBuffers)
for i := range res.List {
res.List[i].setBuffers(buffers)
}

// The `metadata` function below was used earlier to calculate `ReceiptHash` which is further
// used to validate against `header.ReceiptHash`. By default, state-sync receipts (which are
// appended at the end of list for a block) are excluded from the `ReceiptHash` calculation.
// After the state-sync hard fork, they should be included in the calculation. We don't have
// access to block number here so we can't determine whether to exclude or not. Instead, just
// ignore the `metadata` function and pass on the whole receipt list as is. The receipt queue
// handler which has access to block number will take care of the exclusion if needed.
metadata := func() interface{} {
hasher := trie.NewStackTrie(nil)
hashes := make([]common.Hash, len(resWithoutStateSync.List))
for i := range resWithoutStateSync.List {
// The receipt root of a block doesn't include receipts from state-sync
// transactions specific to polygon. Exclude them for calculating the
// hashes of all receipts.
resWithoutStateSync.List[i].ExcludeStateSyncReceipt()
hashes[i] = types.DeriveSha(resWithoutStateSync.List[i], hasher)
}

return hashes
}
var enc ReceiptsRLPResponse
for i := range res.List {
enc = append(enc, res.List[i].EncodeForStorage())
return nil
}

// Assign the decoded receipt list to the result of `Response` packet.
return peer.dispatchResponse(&Response{
id: res.RequestId,
code: ReceiptsMsg,
Res: &enc,
Res: &res.List,
}, metadata)
}

// EncodeReceiptsAndPrepareHasher encodes a list of receipts to the storage format (does not
// include TxType field). It also returns a function which calculates `ReceiptHash` of a receipt list
// based on the whether we've crossed the hardfork or not.
func EncodeReceiptsAndPrepareHasher(packet interface{}, borCfg *params.BorConfig) (ReceiptsRLPResponse, func(int, *big.Int) common.Hash) {
// Extract receipts based on type. Add/remove support for new types here as needed.
var (
receipts ReceiptsRLPResponse
getReceiptListHashes func(int, *big.Int) common.Hash
)
switch packet := packet.(type) {
case []*ReceiptList68:
receiptList := packet
receipts, getReceiptListHashes = encodeReceiptsAndPrepareHasher(receiptList, borCfg)
case []*ReceiptList69:
receiptList := packet
receipts, getReceiptListHashes = encodeReceiptsAndPrepareHasher(receiptList, borCfg)
default:
// This shouldn't happen unless there's a bug in identifying type of receipt list
// or there's a new type which isn't handled here.
return nil, nil
}
return receipts, getReceiptListHashes
}

// encodeReceiptsAndPrepareHasher is an internal generic function for all receipt types
func encodeReceiptsAndPrepareHasher[L ReceiptsList](receipts []L, borCfg *params.BorConfig) (ReceiptsRLPResponse, func(int, *big.Int) common.Hash) {
var encodedReceipts ReceiptsRLPResponse = make(ReceiptsRLPResponse, len(receipts))
for i := range receipts {
encodedReceipts[i] = receipts[i].EncodeForStorage()
}

hasher := trie.NewStackTrie(nil)
calculateReceiptHashes := func(index int, number *big.Int) common.Hash {
// Don't exclude state-sync receipts for post hardfork blocks
if borCfg.IsStateSync(number) {
return types.DeriveSha(receipts[index], hasher)
} else {
receipts[index].ExcludeStateSyncReceipt()
return types.DeriveSha(receipts[index], hasher)
}
}

return encodedReceipts, calculateReceiptHashes
}

func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) error {
// New transaction announcement arrived, make sure we have
// a valid and fresh chain to handle them
Expand Down
5 changes: 4 additions & 1 deletion eth/protocols/eth/receipt.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,10 @@ func blockReceiptsToNetwork69(blockReceipts, blockBody rlp.RawValue, isStateSync
content, _, _ := rlp.SplitList(it.Value())
receiptList := enc.List()
if isStateSyncReceipt(i) {
enc.WriteUint64(uint64(0)) // TxType is always 0 for state-sync transactions
// TxType is always 0 for state-sync transactions before state-sync hardfork. Post
// hardfork, they will be part of normal block receipts and body so no special
// handling needed.
enc.WriteUint64(uint64(0))
} else {
txType, _ := nextTxType()
enc.WriteUint64(uint64(txType))
Expand Down
Loading
Loading