Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk syncing of missing BlockMetadata #2765

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 158 additions & 0 deletions arbnode/blockmetadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package arbnode

import (
"bytes"
"context"
"encoding/binary"
"time"

"github.com/spf13/pflag"

"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/execution/gethexec"
"github.com/offchainlabs/nitro/util/rpcclient"
"github.com/offchainlabs/nitro/util/stopwaiter"
)

type BlockMetadataRebuilderConfig struct {
Enable bool `koanf:"enable"`
Source rpcclient.ClientConfig `koanf:"source"`
SyncInterval time.Duration `koanf:"sync-interval"`
APIBlocksLimit uint64 `koanf:"api-blocks-limit"`
}

var DefaultBlockMetadataRebuilderConfig = BlockMetadataRebuilderConfig{
Enable: false,
Source: rpcclient.DefaultClientConfig,
SyncInterval: time.Minute * 5,
APIBlocksLimit: 100,
}

func BlockMetadataRebuilderConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".enable", DefaultBlockMetadataRebuilderConfig.Enable, "enable syncing blockMetadata using a bulk blockMetadata api")
rpcclient.RPCClientAddOptions(prefix+".source", f, &DefaultBlockMetadataRebuilderConfig.Source)
f.Duration(prefix+".rebuild-interval", DefaultBlockMetadataRebuilderConfig.SyncInterval, "interval at which blockMetadata are synced regularly")
f.Uint64(prefix+".api-blocks-limit", DefaultBlockMetadataRebuilderConfig.APIBlocksLimit, "maximum number of blocks allowed to be queried for blockMetadata per arb_getRawBlockMetadata query.\n"+
"This should be set lesser than or equal to the limit on the api provider side")
}

type BlockMetadataRebuilder struct {
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
stopwaiter.StopWaiter
config BlockMetadataRebuilderConfig
db ethdb.Database
client *rpcclient.RpcClient
exec execution.ExecutionClient
}

func NewBlockMetadataRebuilder(ctx context.Context, c BlockMetadataRebuilderConfig, db ethdb.Database, exec execution.ExecutionClient) (*BlockMetadataRebuilder, error) {
client := rpcclient.NewRpcClient(func() *rpcclient.ClientConfig { return &c.Source }, nil)
if err := client.Start(ctx); err != nil {
return nil, err
}
return &BlockMetadataRebuilder{
config: c,
db: db,
client: client,
exec: exec,
}, nil
}

func (b *BlockMetadataRebuilder) Fetch(ctx context.Context, fromBlock, toBlock uint64) ([]gethexec.NumberAndBlockMetadata, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: why to export this func?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main reason was to enable testing of each components separately (which was done and later replaced with a system test).
But there really is no preference for me here, can always export these functions later if needed- going to make these private now

var result []gethexec.NumberAndBlockMetadata
err := b.client.CallContext(ctx, &result, "arb_getRawBlockMetadata", rpc.BlockNumber(fromBlock), rpc.BlockNumber(toBlock))
if err != nil {
return nil, err
}
return result, nil
}

func ArrayToMap[T comparable](arr []T) map[T]struct{} {
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
ret := make(map[T]struct{})
for _, elem := range arr {
ret[elem] = struct{}{}
}
return ret
}

func (b *BlockMetadataRebuilder) PersistBlockMetadata(query []uint64, result []gethexec.NumberAndBlockMetadata) error {
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
batch := b.db.NewBatch()
queryMap := ArrayToMap(query)
for _, elem := range result {
pos, err := b.exec.BlockNumberToMessageIndex(elem.BlockNumber)
if err != nil {
return err
}
if _, ok := queryMap[uint64(pos)]; ok {
if err := batch.Put(dbKey(blockMetadataInputFeedPrefix, uint64(pos)), elem.RawMetadata); err != nil {
return err
}
if err := batch.Delete(dbKey(missingBlockMetadataInputFeedPrefix, uint64(pos))); err != nil {
return err
}
// If we exceeded the ideal batch size, commit and reset
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
if batch.ValueSize() >= ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
return err
}
batch.Reset()
}
}
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
}
return batch.Write()
}

func (b *BlockMetadataRebuilder) Update(ctx context.Context) time.Duration {
handleQuery := func(query []uint64) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: why not defining handleQuery outside of the Update function?
I mean, why handleQuery is created as a variable while Fetch and PersistBlockMetadata are not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both fetch and persistBlockMetadata each have their own goals to achieve, where as handleQuery is just a way to reduce code-duplication. So I preferred declaring it as a variable inside the main Update function

result, err := b.Fetch(
ctx,
b.exec.MessageIndexToBlockNumber(arbutil.MessageIndex(query[0])),
b.exec.MessageIndexToBlockNumber(arbutil.MessageIndex(query[len(query)-1])),
)
if err != nil {
log.Error("Error getting result from bulk blockMetadata API", "err", err)
return false
}
if err = b.PersistBlockMetadata(query, result); err != nil {
log.Error("Error committing result from bulk blockMetadata API to ArbDB", "err", err)
return false
}
return true
}
iter := b.db.NewIterator(missingBlockMetadataInputFeedPrefix, nil)
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
defer iter.Release()
var query []uint64
for iter.Next() {
keyBytes := bytes.TrimPrefix(iter.Key(), missingBlockMetadataInputFeedPrefix)
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
query = append(query, binary.BigEndian.Uint64(keyBytes))
end := len(query) - 1
if query[end]-query[0]+1 >= uint64(b.config.APIBlocksLimit) {
if query[end]-query[0]+1 > uint64(b.config.APIBlocksLimit) && len(query) >= 2 {
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
end -= 1
}
if success := handleQuery(query[:end+1]); !success {
return b.config.SyncInterval
}
query = query[end+1:]
}
}
if len(query) > 0 {
if success := handleQuery(query); !success {
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
return b.config.SyncInterval
}
}
return b.config.SyncInterval
}

func (b *BlockMetadataRebuilder) Start(ctx context.Context) {
b.StopWaiter.Start(ctx, b)
b.CallIteratively(b.Update)
}

func (b *BlockMetadataRebuilder) StopAndWait() {
b.StopWaiter.StopAndWait()
b.client.Close()
}
89 changes: 56 additions & 33 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,23 @@ func GenerateRollupConfig(prod bool, wasmModuleRoot common.Hash, rollupOwner com
}

type Config struct {
Sequencer bool `koanf:"sequencer"`
ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"`
InboxReader InboxReaderConfig `koanf:"inbox-reader" reload:"hot"`
DelayedSequencer DelayedSequencerConfig `koanf:"delayed-sequencer" reload:"hot"`
BatchPoster BatchPosterConfig `koanf:"batch-poster" reload:"hot"`
MessagePruner MessagePrunerConfig `koanf:"message-pruner" reload:"hot"`
BlockValidator staker.BlockValidatorConfig `koanf:"block-validator" reload:"hot"`
Feed broadcastclient.FeedConfig `koanf:"feed" reload:"hot"`
Staker staker.L1ValidatorConfig `koanf:"staker" reload:"hot"`
SeqCoordinator SeqCoordinatorConfig `koanf:"seq-coordinator"`
DataAvailability das.DataAvailabilityConfig `koanf:"data-availability"`
SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"`
Dangerous DangerousConfig `koanf:"dangerous"`
TransactionStreamer TransactionStreamerConfig `koanf:"transaction-streamer" reload:"hot"`
Maintenance MaintenanceConfig `koanf:"maintenance" reload:"hot"`
ResourceMgmt resourcemanager.Config `koanf:"resource-mgmt" reload:"hot"`
Sequencer bool `koanf:"sequencer"`
ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"`
InboxReader InboxReaderConfig `koanf:"inbox-reader" reload:"hot"`
DelayedSequencer DelayedSequencerConfig `koanf:"delayed-sequencer" reload:"hot"`
BatchPoster BatchPosterConfig `koanf:"batch-poster" reload:"hot"`
MessagePruner MessagePrunerConfig `koanf:"message-pruner" reload:"hot"`
BlockValidator staker.BlockValidatorConfig `koanf:"block-validator" reload:"hot"`
Feed broadcastclient.FeedConfig `koanf:"feed" reload:"hot"`
Staker staker.L1ValidatorConfig `koanf:"staker" reload:"hot"`
SeqCoordinator SeqCoordinatorConfig `koanf:"seq-coordinator"`
DataAvailability das.DataAvailabilityConfig `koanf:"data-availability"`
SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"`
Dangerous DangerousConfig `koanf:"dangerous"`
TransactionStreamer TransactionStreamerConfig `koanf:"transaction-streamer" reload:"hot"`
Maintenance MaintenanceConfig `koanf:"maintenance" reload:"hot"`
ResourceMgmt resourcemanager.Config `koanf:"resource-mgmt" reload:"hot"`
BlockMetadataRebuilder BlockMetadataRebuilderConfig `koanf:"block-metadata-rebuilder" reload:"hot"`
// SnapSyncConfig is only used for testing purposes, these should not be configured in production.
SnapSyncTest SnapSyncConfig
}
Expand Down Expand Up @@ -129,6 +130,12 @@ func (c *Config) Validate() error {
if err := c.Staker.Validate(); err != nil {
return err
}
if c.Sequencer && c.TransactionStreamer.TrackBlockMetadataFrom == 0 {
return errors.New("when sequencer is enabled track-missing-block-metadata should be enabled as well")
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
}
if c.TransactionStreamer.TrackBlockMetadataFrom != 0 && !c.BlockMetadataRebuilder.Enable {
log.Warn("track-missing-block-metadata is set but blockMetadata rebuilder is not enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick:

Suggested change
log.Warn("track-missing-block-metadata is set but blockMetadata rebuilder is not enabled")
log.Warn("track-missing-block-metadata-from is set but blockMetadata rebuilder is not enabled")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not returning an error here too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need both TrackBlockMetadataFrom and BlockMetadataRebuilder.Enable?
Why BlockMetadataRebuilder.Enable is not enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discussed with Magic regarding the same question in this thread- #2765 (comment)

}
return nil
}

Expand Down Expand Up @@ -158,26 +165,28 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feed
DangerousConfigAddOptions(prefix+".dangerous", f)
TransactionStreamerConfigAddOptions(prefix+".transaction-streamer", f)
MaintenanceConfigAddOptions(prefix+".maintenance", f)
BlockMetadataRebuilderConfigAddOptions(prefix+"block-metadata-rebuilder", f)
}

var ConfigDefault = Config{
Sequencer: false,
ParentChainReader: headerreader.DefaultConfig,
InboxReader: DefaultInboxReaderConfig,
DelayedSequencer: DefaultDelayedSequencerConfig,
BatchPoster: DefaultBatchPosterConfig,
MessagePruner: DefaultMessagePrunerConfig,
BlockValidator: staker.DefaultBlockValidatorConfig,
Feed: broadcastclient.FeedConfigDefault,
Staker: staker.DefaultL1ValidatorConfig,
SeqCoordinator: DefaultSeqCoordinatorConfig,
DataAvailability: das.DefaultDataAvailabilityConfig,
SyncMonitor: DefaultSyncMonitorConfig,
Dangerous: DefaultDangerousConfig,
TransactionStreamer: DefaultTransactionStreamerConfig,
ResourceMgmt: resourcemanager.DefaultConfig,
Maintenance: DefaultMaintenanceConfig,
SnapSyncTest: DefaultSnapSyncConfig,
Sequencer: false,
ParentChainReader: headerreader.DefaultConfig,
InboxReader: DefaultInboxReaderConfig,
DelayedSequencer: DefaultDelayedSequencerConfig,
BatchPoster: DefaultBatchPosterConfig,
MessagePruner: DefaultMessagePrunerConfig,
BlockValidator: staker.DefaultBlockValidatorConfig,
Feed: broadcastclient.FeedConfigDefault,
Staker: staker.DefaultL1ValidatorConfig,
SeqCoordinator: DefaultSeqCoordinatorConfig,
DataAvailability: das.DefaultDataAvailabilityConfig,
SyncMonitor: DefaultSyncMonitorConfig,
Dangerous: DefaultDangerousConfig,
TransactionStreamer: DefaultTransactionStreamerConfig,
ResourceMgmt: resourcemanager.DefaultConfig,
Maintenance: DefaultMaintenanceConfig,
BlockMetadataRebuilder: DefaultBlockMetadataRebuilderConfig,
SnapSyncTest: DefaultSnapSyncConfig,
}

func ConfigDefaultL1Test() *Config {
Expand Down Expand Up @@ -272,6 +281,7 @@ type Node struct {
MaintenanceRunner *MaintenanceRunner
DASLifecycleManager *das.LifecycleManager
SyncMonitor *SyncMonitor
blockMetadataRebuilder *BlockMetadataRebuilder
configFetcher ConfigFetcher
ctx context.Context
}
Expand Down Expand Up @@ -480,6 +490,14 @@ func createNodeImpl(
}
}

var blockMetadataRebuilder *BlockMetadataRebuilder
if config.BlockMetadataRebuilder.Enable {
blockMetadataRebuilder, err = NewBlockMetadataRebuilder(ctx, config.BlockMetadataRebuilder, arbDb, exec)
if err != nil {
return nil, err
}
}

if !config.ParentChainReader.Enable {
return &Node{
ArbDB: arbDb,
Expand All @@ -503,6 +521,7 @@ func createNodeImpl(
MaintenanceRunner: maintenanceRunner,
DASLifecycleManager: nil,
SyncMonitor: syncMonitor,
blockMetadataRebuilder: blockMetadataRebuilder,
configFetcher: configFetcher,
ctx: ctx,
}, nil
Expand Down Expand Up @@ -739,6 +758,7 @@ func createNodeImpl(
MaintenanceRunner: maintenanceRunner,
DASLifecycleManager: dasLifecycleManager,
SyncMonitor: syncMonitor,
blockMetadataRebuilder: blockMetadataRebuilder,
configFetcher: configFetcher,
ctx: ctx,
}, nil
Expand Down Expand Up @@ -922,6 +942,9 @@ func (n *Node) Start(ctx context.Context) error {
n.BroadcastClients.Start(ctx)
}()
}
if n.blockMetadataRebuilder != nil {
n.blockMetadataRebuilder.Start(ctx)
}
if n.configFetcher != nil {
n.configFetcher.Start(ctx)
}
Expand Down
19 changes: 10 additions & 9 deletions arbnode/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@
package arbnode

var (
messagePrefix []byte = []byte("m") // maps a message sequence number to a message
blockHashInputFeedPrefix []byte = []byte("b") // maps a message sequence number to a block hash received through the input feed
blockMetadataInputFeedPrefix []byte = []byte("t") // maps a message sequence number to a blockMetaData byte array received through the input feed
messageResultPrefix []byte = []byte("r") // maps a message sequence number to a message result
legacyDelayedMessagePrefix []byte = []byte("d") // maps a delayed sequence number to an accumulator and a message as serialized on L1
rlpDelayedMessagePrefix []byte = []byte("e") // maps a delayed sequence number to an accumulator and an RLP encoded message
parentChainBlockNumberPrefix []byte = []byte("p") // maps a delayed sequence number to a parent chain block number
sequencerBatchMetaPrefix []byte = []byte("s") // maps a batch sequence number to BatchMetadata
delayedSequencedPrefix []byte = []byte("a") // maps a delayed message count to the first sequencer batch sequence number with this delayed count
messagePrefix []byte = []byte("m") // maps a message sequence number to a message
blockHashInputFeedPrefix []byte = []byte("b") // maps a message sequence number to a block hash received through the input feed
blockMetadataInputFeedPrefix []byte = []byte("t") // maps a message sequence number to a blockMetaData byte array received through the input feed
missingBlockMetadataInputFeedPrefix []byte = []byte("xt") // maps a message sequence number whose blockMetaData byte array is missing to nil. Leading "x" implies we are tracking the missing of such a data point
messageResultPrefix []byte = []byte("r") // maps a message sequence number to a message result
legacyDelayedMessagePrefix []byte = []byte("d") // maps a delayed sequence number to an accumulator and a message as serialized on L1
rlpDelayedMessagePrefix []byte = []byte("e") // maps a delayed sequence number to an accumulator and an RLP encoded message
parentChainBlockNumberPrefix []byte = []byte("p") // maps a delayed sequence number to a parent chain block number
sequencerBatchMetaPrefix []byte = []byte("s") // maps a batch sequence number to BatchMetadata
delayedSequencedPrefix []byte = []byte("a") // maps a delayed message count to the first sequencer batch sequence number with this delayed count

messageCountKey []byte = []byte("_messageCount") // contains the current message count
delayedMessageCountKey []byte = []byte("_delayedMessageCount") // contains the current delayed message count
Expand Down
Loading
Loading