From bdd1dc8be4895262e7a454a26a04966b14596bdf Mon Sep 17 00:00:00 2001 From: pthmas <9058370+pthmas@users.noreply.github.com> Date: Wed, 14 May 2025 22:26:45 +0200 Subject: [PATCH 1/8] add block iterator --- pkg/rpc/core/blocks.go | 102 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 98 insertions(+), 4 deletions(-) diff --git a/pkg/rpc/core/blocks.go b/pkg/rpc/core/blocks.go index e396076..656cf20 100644 --- a/pkg/rpc/core/blocks.go +++ b/pkg/rpc/core/blocks.go @@ -1,7 +1,9 @@ package core import ( + "context" "encoding/binary" + "encoding/json" "errors" "fmt" "sort" @@ -11,6 +13,8 @@ import ( ctypes "github.com/cometbft/cometbft/rpc/core/types" rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" cmttypes "github.com/cometbft/cometbft/types" + ds "github.com/ipfs/go-datastore" + dsq "github.com/ipfs/go-datastore/query" "github.com/rollkit/rollkit/block" rlktypes "github.com/rollkit/rollkit/types" @@ -276,13 +280,12 @@ func BlockchainInfo(ctx *rpctypes.Context, minHeight, maxHeight int64) (*ctypes. env.Logger.Debug("BlockchainInfo", "maxHeight", maxHeight, "minHeight", minHeight) blocks := make([]*cmttypes.BlockMeta, 0, maxHeight-minHeight+1) - for height := maxHeight; height >= minHeight; height-- { - header, data, err := env.Adapter.RollkitStore.GetBlockData(ctx.Context(), uint64(height)) + for _, block := range BlockIterator(maxHeight, minHeight) { if err != nil { return nil, err } - if header != nil && data != nil { - cmblockmeta, err := common.ToABCIBlockMeta(header, data) + if block.header != nil && block.data != nil { + cmblockmeta, err := common.ToABCIBlockMeta(block.header, block.data) if err != nil { return nil, err } @@ -295,3 +298,94 @@ func BlockchainInfo(ctx *rpctypes.Context, minHeight, maxHeight int64) (*ctypes. BlockMetas: blocks, }, nil } + +type BlockFilter struct { // needs this for the Filter interface + start int64 + end int64 + field string //need this field for differentiation between getting headers and getting data +} + +func (f *BlockFilter) Filter(e dsq.Entry) bool { + var height uint64 + if f.field == "data" { //not great but necessary because we are not getting the same data + var block rlktypes.Data + err := json.Unmarshal(e.Value, &block) + if err != nil { + return false + } + height = block.Height() + } + if f.field == "header" { + var block rlktypes.SignedHeader + err := json.Unmarshal(e.Value, &block) + if err != nil { + return false + } + height = block.Height() + } + + return height >= uint64(f.end) && height <= uint64(f.start) +} + +func BlockIterator(start int64, end int64) []BlockResponse { + var blocks []BlockResponse + ds, ok := env.Adapter.RollkitStore.(ds.Batching) + if !ok { + return blocks + } + filter := &BlockFilter{start: start, end: end} + + // we need to do two queries, one for the block header and one for the block data + qHeader := dsq.Query{ + Prefix: "d", + } + qHeader.Filters = append(qHeader.Filters, filter) + + qData := dsq.Query{ + Prefix: "h", + } + qData.Filters = append(qData.Filters, filter) + // TODO: add sorting to get the result in the right order + + rHeader, err := ds.Query(context.Background(), qHeader) + if err != nil { + return blocks + } + rData, err := ds.Query(context.Background(), qData) + if err != nil { + return blocks + } + + headers, err := rHeader.Rest() // wait to get all the results, not needed but easier implementation for now + if err != nil { + return blocks + } + datas, err := rData.Rest() + if err != nil { + return blocks + } + + for i := 0; i < len(headers); i++ { + header := new(rlktypes.SignedHeader) + err = header.UnmarshalBinary(headers[i].Value) + if err != nil { + continue + } + + data := new(rlktypes.Data) + err = data.UnmarshalBinary(datas[i].Value) + if err != nil { + continue + } + // TODO: add a check that the heights are the same for the header and for the data + blocks = append(blocks, BlockResponse{header: header, data: data}) + // here we do the assumption that the header and data results are sorted and that the heights are matching + + } + return blocks +} + +type BlockResponse struct { + header *rlktypes.SignedHeader + data *rlktypes.Data +} From 6b4052e474d71cfbbdc97d4455ad9dc7c1b6441b Mon Sep 17 00:00:00 2001 From: pthmas <9058370+pthmas@users.noreply.github.com> Date: Thu, 15 May 2025 20:03:41 +0200 Subject: [PATCH 2/8] add sorting + height matching and moved to utils --- pkg/rpc/core/blocks.go | 95 ------------------------------------- pkg/rpc/core/utils.go | 103 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 103 insertions(+), 95 deletions(-) diff --git a/pkg/rpc/core/blocks.go b/pkg/rpc/core/blocks.go index 656cf20..89c4ff8 100644 --- a/pkg/rpc/core/blocks.go +++ b/pkg/rpc/core/blocks.go @@ -1,9 +1,7 @@ package core import ( - "context" "encoding/binary" - "encoding/json" "errors" "fmt" "sort" @@ -13,8 +11,6 @@ import ( ctypes "github.com/cometbft/cometbft/rpc/core/types" rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" cmttypes "github.com/cometbft/cometbft/types" - ds "github.com/ipfs/go-datastore" - dsq "github.com/ipfs/go-datastore/query" "github.com/rollkit/rollkit/block" rlktypes "github.com/rollkit/rollkit/types" @@ -298,94 +294,3 @@ func BlockchainInfo(ctx *rpctypes.Context, minHeight, maxHeight int64) (*ctypes. BlockMetas: blocks, }, nil } - -type BlockFilter struct { // needs this for the Filter interface - start int64 - end int64 - field string //need this field for differentiation between getting headers and getting data -} - -func (f *BlockFilter) Filter(e dsq.Entry) bool { - var height uint64 - if f.field == "data" { //not great but necessary because we are not getting the same data - var block rlktypes.Data - err := json.Unmarshal(e.Value, &block) - if err != nil { - return false - } - height = block.Height() - } - if f.field == "header" { - var block rlktypes.SignedHeader - err := json.Unmarshal(e.Value, &block) - if err != nil { - return false - } - height = block.Height() - } - - return height >= uint64(f.end) && height <= uint64(f.start) -} - -func BlockIterator(start int64, end int64) []BlockResponse { - var blocks []BlockResponse - ds, ok := env.Adapter.RollkitStore.(ds.Batching) - if !ok { - return blocks - } - filter := &BlockFilter{start: start, end: end} - - // we need to do two queries, one for the block header and one for the block data - qHeader := dsq.Query{ - Prefix: "d", - } - qHeader.Filters = append(qHeader.Filters, filter) - - qData := dsq.Query{ - Prefix: "h", - } - qData.Filters = append(qData.Filters, filter) - // TODO: add sorting to get the result in the right order - - rHeader, err := ds.Query(context.Background(), qHeader) - if err != nil { - return blocks - } - rData, err := ds.Query(context.Background(), qData) - if err != nil { - return blocks - } - - headers, err := rHeader.Rest() // wait to get all the results, not needed but easier implementation for now - if err != nil { - return blocks - } - datas, err := rData.Rest() - if err != nil { - return blocks - } - - for i := 0; i < len(headers); i++ { - header := new(rlktypes.SignedHeader) - err = header.UnmarshalBinary(headers[i].Value) - if err != nil { - continue - } - - data := new(rlktypes.Data) - err = data.UnmarshalBinary(datas[i].Value) - if err != nil { - continue - } - // TODO: add a check that the heights are the same for the header and for the data - blocks = append(blocks, BlockResponse{header: header, data: data}) - // here we do the assumption that the header and data results are sorted and that the heights are matching - - } - return blocks -} - -type BlockResponse struct { - header *rlktypes.SignedHeader - data *rlktypes.Data -} diff --git a/pkg/rpc/core/utils.go b/pkg/rpc/core/utils.go index a9bd628..c737637 100644 --- a/pkg/rpc/core/utils.go +++ b/pkg/rpc/core/utils.go @@ -3,12 +3,17 @@ package core import ( "context" "encoding/hex" + "encoding/json" "errors" "fmt" + "sort" cmttypes "github.com/cometbft/cometbft/types" + ds "github.com/ipfs/go-datastore" + dsq "github.com/ipfs/go-datastore/query" "github.com/rollkit/go-execution-abci/pkg/common" + rlktypes "github.com/rollkit/rollkit/types" ) const NodeIDByteLength = 20 @@ -104,3 +109,101 @@ func TruncateNodeID(idStr string) (string, error) { } return hex.EncodeToString(idBytes[:NodeIDByteLength]), nil } + +func getHeightFromEntry(field string, value []byte) (uint64, error) { + if field == "data" { + var block rlktypes.Data + if err := json.Unmarshal(value, &block); err != nil { + return 0, err + } + return block.Height(), nil + } else if field == "header" { + var block rlktypes.SignedHeader + if err := json.Unmarshal(value, &block); err != nil { + return 0, err + } + return block.Height(), nil + } + return 0, fmt.Errorf("unknown field: %s", field) +} + +type BlockFilter struct { // needs this for the Filter interface + start int64 + end int64 + field string //need this field for differentiation between getting headers and getting data +} + +func (f *BlockFilter) Filter(e dsq.Entry) bool { + height, err := getHeightFromEntry(f.field, e.Value) + if err != nil { + return false + } + return height >= uint64(f.end) && height <= uint64(f.start) +} + +func BlockIterator(start int64, end int64) []BlockResponse { + var blocks []BlockResponse + ds, ok := env.Adapter.RollkitStore.(ds.Batching) + if !ok { + return blocks + } + filterData := &BlockFilter{start: start, end: end, field: "data"} + filterHeader := &BlockFilter{start: start, end: end, field: "header"} + + // we need to do two queries, one for the block header and one for the block data + qHeader := dsq.Query{ + Prefix: "h", + } + qHeader.Filters = append(qHeader.Filters, filterHeader) + + qData := dsq.Query{ + Prefix: "d", + } + qData.Filters = append(qData.Filters, filterData) + // TODO: add sorting to get the result in the right order + + rHeader, err := ds.Query(context.Background(), qHeader) + if err != nil { + return blocks + } + rData, err := ds.Query(context.Background(), qData) + if err != nil { + return blocks + } + + headerMap := make(map[uint64]*rlktypes.SignedHeader) + for h := range rHeader.Next() { + header := new(rlktypes.SignedHeader) + if err := header.UnmarshalBinary(h.Value); err != nil { + continue + } + headerMap[header.Height()] = header + } + + dataMap := make(map[uint64]*rlktypes.Data) + for d := range rData.Next() { + data := new(rlktypes.Data) + if err := data.UnmarshalBinary(d.Value); err != nil { + continue + } + dataMap[data.Height()] = data + } + + for height, header := range headerMap { + if data, ok := dataMap[height]; ok { + blocks = append(blocks, BlockResponse{header: header, data: data}) + } + } + + // Sort blocks by height descending + sort.Slice(blocks, func(i, j int) bool { + return blocks[i].header.Height() > blocks[j].header.Height() + }) + + return blocks +} + +type BlockResponse struct { + header *rlktypes.SignedHeader + data *rlktypes.Data +} From c3e211d0e61af7502a0d5972e4b8543cf699d6d6 Mon Sep 17 00:00:00 2001 From: pthmas <9058370+pthmas@users.noreply.github.com> Date: Thu, 15 May 2025 20:22:43 +0200 Subject: [PATCH 3/8] added comment and linting --- pkg/rpc/core/utils.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pkg/rpc/core/utils.go b/pkg/rpc/core/utils.go index c737637..17b3e86 100644 --- a/pkg/rpc/core/utils.go +++ b/pkg/rpc/core/utils.go @@ -12,8 +12,9 @@ import ( ds "github.com/ipfs/go-datastore" dsq "github.com/ipfs/go-datastore/query" - "github.com/rollkit/go-execution-abci/pkg/common" rlktypes "github.com/rollkit/rollkit/types" + + "github.com/rollkit/go-execution-abci/pkg/common" ) const NodeIDByteLength = 20 @@ -111,13 +112,14 @@ func TruncateNodeID(idStr string) (string, error) { } func getHeightFromEntry(field string, value []byte) (uint64, error) { - if field == "data" { + switch field { + case "data": var block rlktypes.Data if err := json.Unmarshal(value, &block); err != nil { return 0, err } return block.Height(), nil - } else if field == "header" { + case "header": var block rlktypes.SignedHeader if err := json.Unmarshal(value, &block); err != nil { return 0, err @@ -160,7 +162,6 @@ func BlockIterator(start int64, end int64) []BlockResponse { Prefix: "d", } qData.Filters = append(qData.Filters, filterData) - // TODO: add sorting to get the result in the right order rHeader, err := ds.Query(context.Background(), qHeader) if err != nil { @@ -171,6 +172,7 @@ func BlockIterator(start int64, end int64) []BlockResponse { return blocks } + //we need to match the data to the header using the height, for that we use a map headerMap := make(map[uint64]*rlktypes.SignedHeader) for h := range rHeader.Next() { header := new(rlktypes.SignedHeader) @@ -189,13 +191,14 @@ func BlockIterator(start int64, end int64) []BlockResponse { dataMap[data.Height()] = data } + //maps the headers to the data for height, header := range headerMap { if data, ok := dataMap[height]; ok { blocks = append(blocks, BlockResponse{header: header, data: data}) } } - // Sort blocks by height descending + //sort blocks by height descending sort.Slice(blocks, func(i, j int) bool { return blocks[i].header.Height() > blocks[j].header.Height() }) From 6bcce80551738031b59248bce84353debbba9789 Mon Sep 17 00:00:00 2001 From: pthmas <9058370+pthmas@users.noreply.github.com> Date: Thu, 15 May 2025 20:41:48 +0200 Subject: [PATCH 4/8] fixed marshalling --- pkg/rpc/core/utils.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/pkg/rpc/core/utils.go b/pkg/rpc/core/utils.go index 17b3e86..b80a587 100644 --- a/pkg/rpc/core/utils.go +++ b/pkg/rpc/core/utils.go @@ -3,7 +3,6 @@ package core import ( "context" "encoding/hex" - "encoding/json" "errors" "fmt" "sort" @@ -114,17 +113,17 @@ func TruncateNodeID(idStr string) (string, error) { func getHeightFromEntry(field string, value []byte) (uint64, error) { switch field { case "data": - var block rlktypes.Data - if err := json.Unmarshal(value, &block); err != nil { + data := new(rlktypes.Data) + if err := data.UnmarshalBinary(value); err != nil { return 0, err } - return block.Height(), nil + return data.Height(), nil case "header": - var block rlktypes.SignedHeader - if err := json.Unmarshal(value, &block); err != nil { + header := new(rlktypes.SignedHeader) + if err := header.UnmarshalBinary(value); err != nil { return 0, err } - return block.Height(), nil + return header.Height(), nil } return 0, fmt.Errorf("unknown field: %s", field) } From 936b746791f8e52da7e9b38558e240816753e2cf Mon Sep 17 00:00:00 2001 From: pthmas <9058370+pthmas@users.noreply.github.com> Date: Thu, 15 May 2025 20:57:30 +0200 Subject: [PATCH 5/8] fix issues, close channels --- pkg/rpc/core/blocks.go | 3 --- pkg/rpc/core/utils.go | 2 ++ 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/rpc/core/blocks.go b/pkg/rpc/core/blocks.go index 89c4ff8..b1d1afa 100644 --- a/pkg/rpc/core/blocks.go +++ b/pkg/rpc/core/blocks.go @@ -277,9 +277,6 @@ func BlockchainInfo(ctx *rpctypes.Context, minHeight, maxHeight int64) (*ctypes. blocks := make([]*cmttypes.BlockMeta, 0, maxHeight-minHeight+1) for _, block := range BlockIterator(maxHeight, minHeight) { - if err != nil { - return nil, err - } if block.header != nil && block.data != nil { cmblockmeta, err := common.ToABCIBlockMeta(block.header, block.data) if err != nil { diff --git a/pkg/rpc/core/utils.go b/pkg/rpc/core/utils.go index b80a587..09a8a6e 100644 --- a/pkg/rpc/core/utils.go +++ b/pkg/rpc/core/utils.go @@ -170,6 +170,8 @@ func BlockIterator(start int64, end int64) []BlockResponse { if err != nil { return blocks } + defer rHeader.Close() //nolint:errcheck + defer rData.Close() //nolint:errcheck //we need to match the data to the header using the height, for that we use a map headerMap := make(map[uint64]*rlktypes.SignedHeader) From 6edd4c6723b22cec20201fc17fcb9049d20833d7 Mon Sep 17 00:00:00 2001 From: pthmas <9058370+pthmas@users.noreply.github.com> Date: Thu, 15 May 2025 21:12:06 +0200 Subject: [PATCH 6/8] make filter private --- pkg/rpc/core/utils.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/rpc/core/utils.go b/pkg/rpc/core/utils.go index 09a8a6e..44ab6e3 100644 --- a/pkg/rpc/core/utils.go +++ b/pkg/rpc/core/utils.go @@ -128,13 +128,13 @@ func getHeightFromEntry(field string, value []byte) (uint64, error) { return 0, fmt.Errorf("unknown field: %s", field) } -type BlockFilter struct { // needs this for the Filter interface +type blockFilter struct { // needs this for the Filter interface start int64 end int64 field string //need this field for differentiation between getting headers and getting data } -func (f *BlockFilter) Filter(e dsq.Entry) bool { +func (f *blockFilter) Filter(e dsq.Entry) bool { height, err := getHeightFromEntry(f.field, e.Value) if err != nil { return false @@ -148,8 +148,8 @@ func BlockIterator(start int64, end int64) []BlockResponse { if !ok { return blocks } - filterData := &BlockFilter{start: start, end: end, field: "data"} - filterHeader := &BlockFilter{start: start, end: end, field: "header"} + filterData := &blockFilter{start: start, end: end, field: "data"} + filterHeader := &blockFilter{start: start, end: end, field: "header"} // we need to do two queries, one for the block header and one for the block data qHeader := dsq.Query{ @@ -207,6 +207,8 @@ func BlockIterator(start int64, end int64) []BlockResponse { return blocks } +// BlockResponse represents a paired block header and data for efficient iteration. +// It's returned by BlockIterator to provide access to both components of a block. type BlockResponse struct { header *rlktypes.SignedHeader data *rlktypes.Data From b3b3026848dd5bfee2b3fac9596f2815694280f7 Mon Sep 17 00:00:00 2001 From: pthmas <9058370+pthmas@users.noreply.github.com> Date: Thu, 15 May 2025 21:28:01 +0200 Subject: [PATCH 7/8] improved naming and pass context to iterator --- pkg/rpc/core/blocks.go | 2 +- pkg/rpc/core/utils.go | 30 ++++++++++++++++++------------ 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/pkg/rpc/core/blocks.go b/pkg/rpc/core/blocks.go index b1d1afa..99cff9b 100644 --- a/pkg/rpc/core/blocks.go +++ b/pkg/rpc/core/blocks.go @@ -276,7 +276,7 @@ func BlockchainInfo(ctx *rpctypes.Context, minHeight, maxHeight int64) (*ctypes. env.Logger.Debug("BlockchainInfo", "maxHeight", maxHeight, "minHeight", minHeight) blocks := make([]*cmttypes.BlockMeta, 0, maxHeight-minHeight+1) - for _, block := range BlockIterator(maxHeight, minHeight) { + for _, block := range BlockIterator(ctx.Context(), maxHeight, minHeight) { if block.header != nil && block.data != nil { cmblockmeta, err := common.ToABCIBlockMeta(block.header, block.data) if err != nil { diff --git a/pkg/rpc/core/utils.go b/pkg/rpc/core/utils.go index 44ab6e3..653692a 100644 --- a/pkg/rpc/core/utils.go +++ b/pkg/rpc/core/utils.go @@ -129,8 +129,8 @@ func getHeightFromEntry(field string, value []byte) (uint64, error) { } type blockFilter struct { // needs this for the Filter interface - start int64 - end int64 + max int64 + min int64 field string //need this field for differentiation between getting headers and getting data } @@ -139,17 +139,17 @@ func (f *blockFilter) Filter(e dsq.Entry) bool { if err != nil { return false } - return height >= uint64(f.end) && height <= uint64(f.start) + return height >= uint64(f.min) && height <= uint64(f.max) } -func BlockIterator(start int64, end int64) []BlockResponse { +func BlockIterator(ctx context.Context, max int64, min int64) []BlockResponse { var blocks []BlockResponse ds, ok := env.Adapter.RollkitStore.(ds.Batching) if !ok { return blocks } - filterData := &blockFilter{start: start, end: end, field: "data"} - filterHeader := &blockFilter{start: start, end: end, field: "header"} + filterData := &blockFilter{max: max, min: min, field: "data"} + filterHeader := &blockFilter{max: max, min: min, field: "header"} // we need to do two queries, one for the block header and one for the block data qHeader := dsq.Query{ @@ -162,11 +162,11 @@ func BlockIterator(start int64, end int64) []BlockResponse { } qData.Filters = append(qData.Filters, filterData) - rHeader, err := ds.Query(context.Background(), qHeader) + rHeader, err := ds.Query(ctx, qHeader) if err != nil { return blocks } - rData, err := ds.Query(context.Background(), qData) + rData, err := ds.Query(ctx, qData) if err != nil { return blocks } @@ -175,18 +175,24 @@ func BlockIterator(start int64, end int64) []BlockResponse { //we need to match the data to the header using the height, for that we use a map headerMap := make(map[uint64]*rlktypes.SignedHeader) - for h := range rHeader.Next() { + for res := range rHeader.Next() { + if res.Error != nil { + continue + } header := new(rlktypes.SignedHeader) - if err := header.UnmarshalBinary(h.Value); err != nil { + if err := header.UnmarshalBinary(res.Value); err != nil { continue } headerMap[header.Height()] = header } dataMap := make(map[uint64]*rlktypes.Data) - for d := range rData.Next() { + for res := range rData.Next() { + if res.Error != nil { + continue + } data := new(rlktypes.Data) - if err := data.UnmarshalBinary(d.Value); err != nil { + if err := data.UnmarshalBinary(res.Value); err != nil { continue } dataMap[data.Height()] = data From faea84dfbe5ea331db109829042fd587fbe68b8c Mon Sep 17 00:00:00 2001 From: pthmas <9058370+pthmas@users.noreply.github.com> Date: Thu, 15 May 2025 21:53:57 +0200 Subject: [PATCH 8/8] added doc and accessors --- pkg/rpc/core/utils.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/pkg/rpc/core/utils.go b/pkg/rpc/core/utils.go index 653692a..fb3e53a 100644 --- a/pkg/rpc/core/utils.go +++ b/pkg/rpc/core/utils.go @@ -142,6 +142,11 @@ func (f *blockFilter) Filter(e dsq.Entry) bool { return height >= uint64(f.min) && height <= uint64(f.max) } +// BlockIterator returns a slice of BlockResponse objects containing paired headers and data +// for blocks within the specified height range. It efficiently retrieves blocks by performing +// only two datastore queries (one for headers, one for data) rather than querying each block +// individually. +// Returns a slice of BlockResponse objects sorted by height in descending order. func BlockIterator(ctx context.Context, max int64, min int64) []BlockResponse { var blocks []BlockResponse ds, ok := env.Adapter.RollkitStore.(ds.Batching) @@ -219,3 +224,13 @@ type BlockResponse struct { header *rlktypes.SignedHeader data *rlktypes.Data } + +// returns the block's signed header. +func (br BlockResponse) Header() *rlktypes.SignedHeader { + return br.header +} + +// returns the block's data. +func (br BlockResponse) Data() *rlktypes.Data { + return br.data +}