Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk syncing of missing BlockMetadata #2765

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 149 additions & 0 deletions arbnode/blockmetadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package arbnode

import (
"bytes"
"context"
"encoding/binary"
"time"

"github.com/spf13/pflag"

"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/execution/gethexec"
"github.com/offchainlabs/nitro/util"
"github.com/offchainlabs/nitro/util/rpcclient"
"github.com/offchainlabs/nitro/util/stopwaiter"
)

type BlockMetadataFetcherConfig struct {
Enable bool `koanf:"enable"`
Source rpcclient.ClientConfig `koanf:"source"`
SyncInterval time.Duration `koanf:"sync-interval"`
APIBlocksLimit uint64 `koanf:"api-blocks-limit"`
}

var DefaultBlockMetadataFetcherConfig = BlockMetadataFetcherConfig{
Enable: false,
Source: rpcclient.DefaultClientConfig,
SyncInterval: time.Minute * 5,
APIBlocksLimit: 100,
}

func BlockMetadataFetcherConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".enable", DefaultBlockMetadataFetcherConfig.Enable, "enable syncing blockMetadata using a bulk blockMetadata api. If the source doesn't have the missing blockMetadata, we keep retyring in every sync-interval (default=5mins) duration")
rpcclient.RPCClientAddOptions(prefix+".source", f, &DefaultBlockMetadataFetcherConfig.Source)
f.Duration(prefix+".sync-interval", DefaultBlockMetadataFetcherConfig.SyncInterval, "interval at which blockMetadata are synced regularly")
f.Uint64(prefix+".api-blocks-limit", DefaultBlockMetadataFetcherConfig.APIBlocksLimit, "maximum number of blocks allowed to be queried for blockMetadata per arb_getRawBlockMetadata query.\n"+
"This should be set lesser than or equal to the limit on the api provider side")
}

type BlockMetadataFetcher struct {
stopwaiter.StopWaiter
config BlockMetadataFetcherConfig
db ethdb.Database
client *rpcclient.RpcClient
exec execution.ExecutionClient
}

func NewBlockMetadataFetcher(ctx context.Context, c BlockMetadataFetcherConfig, db ethdb.Database, exec execution.ExecutionClient) (*BlockMetadataFetcher, error) {
client := rpcclient.NewRpcClient(func() *rpcclient.ClientConfig { return &c.Source }, nil)
if err := client.Start(ctx); err != nil {
return nil, err
}
return &BlockMetadataFetcher{
config: c,
db: db,
client: client,
exec: exec,
}, nil
}

func (b *BlockMetadataFetcher) fetch(ctx context.Context, fromBlock, toBlock uint64) ([]gethexec.NumberAndBlockMetadata, error) {
var result []gethexec.NumberAndBlockMetadata
err := b.client.CallContext(ctx, &result, "arb_getRawBlockMetadata", rpc.BlockNumber(fromBlock), rpc.BlockNumber(toBlock))
if err != nil {
return nil, err
}
return result, nil
}

func (b *BlockMetadataFetcher) persistBlockMetadata(query []uint64, result []gethexec.NumberAndBlockMetadata) error {
batch := b.db.NewBatch()
queryMap := util.ArrayToSet(query)
for _, elem := range result {
pos, err := b.exec.BlockNumberToMessageIndex(elem.BlockNumber)
if err != nil {
return err
}
if _, ok := queryMap[uint64(pos)]; ok {
if err := batch.Put(dbKey(blockMetadataInputFeedPrefix, uint64(pos)), elem.RawMetadata); err != nil {
return err
}
if err := batch.Delete(dbKey(missingBlockMetadataInputFeedPrefix, uint64(pos))); err != nil {
return err
}
// If we reached the ideal batch size, commit and reset
if batch.ValueSize() >= ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
return err
}
batch.Reset()
}
}
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
}
return batch.Write()
}

func (b *BlockMetadataFetcher) Update(ctx context.Context) time.Duration {
handleQuery := func(query []uint64) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: why not defining handleQuery outside of the Update function?
I mean, why handleQuery is created as a variable while Fetch and PersistBlockMetadata are not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both fetch and persistBlockMetadata each have their own goals to achieve, where as handleQuery is just a way to reduce code-duplication. So I preferred declaring it as a variable inside the main Update function

result, err := b.fetch(
ctx,
b.exec.MessageIndexToBlockNumber(arbutil.MessageIndex(query[0])),
b.exec.MessageIndexToBlockNumber(arbutil.MessageIndex(query[len(query)-1])),
)
if err != nil {
log.Error("Error getting result from bulk blockMetadata API", "err", err)
return false
}
if err = b.persistBlockMetadata(query, result); err != nil {
log.Error("Error committing result from bulk blockMetadata API to ArbDB", "err", err)
return false
}
return true
}
iter := b.db.NewIterator(missingBlockMetadataInputFeedPrefix, nil)
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
defer iter.Release()
var query []uint64
for iter.Next() {
keyBytes := bytes.TrimPrefix(iter.Key(), missingBlockMetadataInputFeedPrefix)
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
query = append(query, binary.BigEndian.Uint64(keyBytes))
end := len(query) - 1
if query[end]-query[0]+1 >= uint64(b.config.APIBlocksLimit) {
if query[end]-query[0]+1 > uint64(b.config.APIBlocksLimit) && len(query) >= 2 {
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
end -= 1
}
if success := handleQuery(query[:end+1]); !success {
return b.config.SyncInterval
}
query = query[end+1:]
}
}
if len(query) > 0 {
_ = handleQuery(query)
}
return b.config.SyncInterval
}

func (b *BlockMetadataFetcher) Start(ctx context.Context) {
b.StopWaiter.Start(ctx, b)
b.CallIteratively(b.Update)
}

func (b *BlockMetadataFetcher) StopAndWait() {
b.StopWaiter.StopAndWait()
b.client.Close()
}
89 changes: 56 additions & 33 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,23 @@ func GenerateRollupConfig(prod bool, wasmModuleRoot common.Hash, rollupOwner com
}

type Config struct {
Sequencer bool `koanf:"sequencer"`
ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"`
InboxReader InboxReaderConfig `koanf:"inbox-reader" reload:"hot"`
DelayedSequencer DelayedSequencerConfig `koanf:"delayed-sequencer" reload:"hot"`
BatchPoster BatchPosterConfig `koanf:"batch-poster" reload:"hot"`
MessagePruner MessagePrunerConfig `koanf:"message-pruner" reload:"hot"`
BlockValidator staker.BlockValidatorConfig `koanf:"block-validator" reload:"hot"`
Feed broadcastclient.FeedConfig `koanf:"feed" reload:"hot"`
Staker staker.L1ValidatorConfig `koanf:"staker" reload:"hot"`
SeqCoordinator SeqCoordinatorConfig `koanf:"seq-coordinator"`
DataAvailability das.DataAvailabilityConfig `koanf:"data-availability"`
SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"`
Dangerous DangerousConfig `koanf:"dangerous"`
TransactionStreamer TransactionStreamerConfig `koanf:"transaction-streamer" reload:"hot"`
Maintenance MaintenanceConfig `koanf:"maintenance" reload:"hot"`
ResourceMgmt resourcemanager.Config `koanf:"resource-mgmt" reload:"hot"`
Sequencer bool `koanf:"sequencer"`
ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"`
InboxReader InboxReaderConfig `koanf:"inbox-reader" reload:"hot"`
DelayedSequencer DelayedSequencerConfig `koanf:"delayed-sequencer" reload:"hot"`
BatchPoster BatchPosterConfig `koanf:"batch-poster" reload:"hot"`
MessagePruner MessagePrunerConfig `koanf:"message-pruner" reload:"hot"`
BlockValidator staker.BlockValidatorConfig `koanf:"block-validator" reload:"hot"`
Feed broadcastclient.FeedConfig `koanf:"feed" reload:"hot"`
Staker staker.L1ValidatorConfig `koanf:"staker" reload:"hot"`
SeqCoordinator SeqCoordinatorConfig `koanf:"seq-coordinator"`
DataAvailability das.DataAvailabilityConfig `koanf:"data-availability"`
SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"`
Dangerous DangerousConfig `koanf:"dangerous"`
TransactionStreamer TransactionStreamerConfig `koanf:"transaction-streamer" reload:"hot"`
Maintenance MaintenanceConfig `koanf:"maintenance" reload:"hot"`
ResourceMgmt resourcemanager.Config `koanf:"resource-mgmt" reload:"hot"`
BlockMetadataFetcher BlockMetadataFetcherConfig `koanf:"block-metadata-fetcher" reload:"hot"`
// SnapSyncConfig is only used for testing purposes, these should not be configured in production.
SnapSyncTest SnapSyncConfig
}
Expand Down Expand Up @@ -129,6 +130,12 @@ func (c *Config) Validate() error {
if err := c.Staker.Validate(); err != nil {
return err
}
if c.Sequencer && c.TransactionStreamer.TrackBlockMetadataFrom == 0 {
return errors.New("when sequencer is enabled track-missing-block-metadata-from should be set as well")
}
if c.TransactionStreamer.TrackBlockMetadataFrom != 0 && !c.BlockMetadataFetcher.Enable {
log.Warn("track-missing-block-metadata-from is set but blockMetadata fetcher is not enabled")
}
return nil
}

Expand Down Expand Up @@ -158,26 +165,28 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feed
DangerousConfigAddOptions(prefix+".dangerous", f)
TransactionStreamerConfigAddOptions(prefix+".transaction-streamer", f)
MaintenanceConfigAddOptions(prefix+".maintenance", f)
BlockMetadataFetcherConfigAddOptions(prefix+"block-metadata-fetcher", f)
}

var ConfigDefault = Config{
Sequencer: false,
ParentChainReader: headerreader.DefaultConfig,
InboxReader: DefaultInboxReaderConfig,
DelayedSequencer: DefaultDelayedSequencerConfig,
BatchPoster: DefaultBatchPosterConfig,
MessagePruner: DefaultMessagePrunerConfig,
BlockValidator: staker.DefaultBlockValidatorConfig,
Feed: broadcastclient.FeedConfigDefault,
Staker: staker.DefaultL1ValidatorConfig,
SeqCoordinator: DefaultSeqCoordinatorConfig,
DataAvailability: das.DefaultDataAvailabilityConfig,
SyncMonitor: DefaultSyncMonitorConfig,
Dangerous: DefaultDangerousConfig,
TransactionStreamer: DefaultTransactionStreamerConfig,
ResourceMgmt: resourcemanager.DefaultConfig,
Maintenance: DefaultMaintenanceConfig,
SnapSyncTest: DefaultSnapSyncConfig,
Sequencer: false,
ParentChainReader: headerreader.DefaultConfig,
InboxReader: DefaultInboxReaderConfig,
DelayedSequencer: DefaultDelayedSequencerConfig,
BatchPoster: DefaultBatchPosterConfig,
MessagePruner: DefaultMessagePrunerConfig,
BlockValidator: staker.DefaultBlockValidatorConfig,
Feed: broadcastclient.FeedConfigDefault,
Staker: staker.DefaultL1ValidatorConfig,
SeqCoordinator: DefaultSeqCoordinatorConfig,
DataAvailability: das.DefaultDataAvailabilityConfig,
SyncMonitor: DefaultSyncMonitorConfig,
Dangerous: DefaultDangerousConfig,
TransactionStreamer: DefaultTransactionStreamerConfig,
ResourceMgmt: resourcemanager.DefaultConfig,
Maintenance: DefaultMaintenanceConfig,
BlockMetadataFetcher: DefaultBlockMetadataFetcherConfig,
SnapSyncTest: DefaultSnapSyncConfig,
}

func ConfigDefaultL1Test() *Config {
Expand Down Expand Up @@ -272,6 +281,7 @@ type Node struct {
MaintenanceRunner *MaintenanceRunner
DASLifecycleManager *das.LifecycleManager
SyncMonitor *SyncMonitor
blockMetadataFetcher *BlockMetadataFetcher
configFetcher ConfigFetcher
ctx context.Context
}
Expand Down Expand Up @@ -480,6 +490,14 @@ func createNodeImpl(
}
}

var blockMetadataFetcher *BlockMetadataFetcher
if config.BlockMetadataFetcher.Enable {
blockMetadataFetcher, err = NewBlockMetadataFetcher(ctx, config.BlockMetadataFetcher, arbDb, exec)
if err != nil {
return nil, err
}
}

if !config.ParentChainReader.Enable {
return &Node{
ArbDB: arbDb,
Expand All @@ -503,6 +521,7 @@ func createNodeImpl(
MaintenanceRunner: maintenanceRunner,
DASLifecycleManager: nil,
SyncMonitor: syncMonitor,
blockMetadataFetcher: blockMetadataFetcher,
configFetcher: configFetcher,
ctx: ctx,
}, nil
Expand Down Expand Up @@ -739,6 +758,7 @@ func createNodeImpl(
MaintenanceRunner: maintenanceRunner,
DASLifecycleManager: dasLifecycleManager,
SyncMonitor: syncMonitor,
blockMetadataFetcher: blockMetadataFetcher,
configFetcher: configFetcher,
ctx: ctx,
}, nil
Expand Down Expand Up @@ -922,6 +942,9 @@ func (n *Node) Start(ctx context.Context) error {
n.BroadcastClients.Start(ctx)
}()
}
if n.blockMetadataFetcher != nil {
n.blockMetadataFetcher.Start(ctx)
}
if n.configFetcher != nil {
n.configFetcher.Start(ctx)
}
Expand Down
19 changes: 10 additions & 9 deletions arbnode/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@
package arbnode

var (
messagePrefix []byte = []byte("m") // maps a message sequence number to a message
blockHashInputFeedPrefix []byte = []byte("b") // maps a message sequence number to a block hash received through the input feed
blockMetadataInputFeedPrefix []byte = []byte("t") // maps a message sequence number to a blockMetaData byte array received through the input feed
messageResultPrefix []byte = []byte("r") // maps a message sequence number to a message result
legacyDelayedMessagePrefix []byte = []byte("d") // maps a delayed sequence number to an accumulator and a message as serialized on L1
rlpDelayedMessagePrefix []byte = []byte("e") // maps a delayed sequence number to an accumulator and an RLP encoded message
parentChainBlockNumberPrefix []byte = []byte("p") // maps a delayed sequence number to a parent chain block number
sequencerBatchMetaPrefix []byte = []byte("s") // maps a batch sequence number to BatchMetadata
delayedSequencedPrefix []byte = []byte("a") // maps a delayed message count to the first sequencer batch sequence number with this delayed count
messagePrefix []byte = []byte("m") // maps a message sequence number to a message
blockHashInputFeedPrefix []byte = []byte("b") // maps a message sequence number to a block hash received through the input feed
blockMetadataInputFeedPrefix []byte = []byte("t") // maps a message sequence number to a blockMetaData byte array received through the input feed
missingBlockMetadataInputFeedPrefix []byte = []byte("x") // maps a message sequence number whose blockMetaData byte array is missing to nil
messageResultPrefix []byte = []byte("r") // maps a message sequence number to a message result
legacyDelayedMessagePrefix []byte = []byte("d") // maps a delayed sequence number to an accumulator and a message as serialized on L1
rlpDelayedMessagePrefix []byte = []byte("e") // maps a delayed sequence number to an accumulator and an RLP encoded message
parentChainBlockNumberPrefix []byte = []byte("p") // maps a delayed sequence number to a parent chain block number
sequencerBatchMetaPrefix []byte = []byte("s") // maps a batch sequence number to BatchMetadata
delayedSequencedPrefix []byte = []byte("a") // maps a delayed message count to the first sequencer batch sequence number with this delayed count

messageCountKey []byte = []byte("_messageCount") // contains the current message count
delayedMessageCountKey []byte = []byte("_delayedMessageCount") // contains the current delayed message count
Expand Down
20 changes: 20 additions & 0 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,15 @@ type TransactionStreamer struct {
broadcastServer *broadcaster.Broadcaster
inboxReader *InboxReader
delayedBridge *DelayedBridge

trackBlockMetadataFrom arbutil.MessageIndex
}

type TransactionStreamerConfig struct {
MaxBroadcasterQueueSize int `koanf:"max-broadcaster-queue-size"`
MaxReorgResequenceDepth int64 `koanf:"max-reorg-resequence-depth" reload:"hot"`
ExecuteMessageLoopDelay time.Duration `koanf:"execute-message-loop-delay" reload:"hot"`
TrackBlockMetadataFrom uint64 `koanf:"track-block-metadata-from"`
}

type TransactionStreamerConfigFetcher func() *TransactionStreamerConfig
Expand All @@ -82,18 +85,21 @@ var DefaultTransactionStreamerConfig = TransactionStreamerConfig{
MaxBroadcasterQueueSize: 50_000,
MaxReorgResequenceDepth: 1024,
ExecuteMessageLoopDelay: time.Millisecond * 100,
TrackBlockMetadataFrom: 0,
}

var TestTransactionStreamerConfig = TransactionStreamerConfig{
MaxBroadcasterQueueSize: 10_000,
MaxReorgResequenceDepth: 128 * 1024,
ExecuteMessageLoopDelay: time.Millisecond,
TrackBlockMetadataFrom: 0,
}

func TransactionStreamerConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Int(prefix+".max-broadcaster-queue-size", DefaultTransactionStreamerConfig.MaxBroadcasterQueueSize, "maximum cache of pending broadcaster messages")
f.Int64(prefix+".max-reorg-resequence-depth", DefaultTransactionStreamerConfig.MaxReorgResequenceDepth, "maximum number of messages to attempt to resequence on reorg (0 = never resequence, -1 = always resequence)")
f.Duration(prefix+".execute-message-loop-delay", DefaultTransactionStreamerConfig.ExecuteMessageLoopDelay, "delay when polling calls to execute messages")
f.Uint64(prefix+".track-block-metadata-from", DefaultTransactionStreamerConfig.TrackBlockMetadataFrom, "this is the block number starting from which missing of blockmetadata is being tracked in the local disk. Setting to zero (default value) disables this")
}

func NewTransactionStreamer(
Expand All @@ -119,6 +125,13 @@ func NewTransactionStreamer(
if err != nil {
return nil, err
}
if config().TrackBlockMetadataFrom != 0 {
trackBlockMetadataFrom, err := exec.BlockNumberToMessageIndex(config().TrackBlockMetadataFrom)
if err != nil {
return nil, err
}
streamer.trackBlockMetadataFrom = trackBlockMetadataFrom
}
return streamer, nil
}

Expand Down Expand Up @@ -386,6 +399,10 @@ func (s *TransactionStreamer) reorg(batch ethdb.Batch, count arbutil.MessageInde
if err != nil {
return err
}
err = deleteStartingAt(s.db, batch, missingBlockMetadataInputFeedPrefix, uint64ToKey(uint64(count)))
if err != nil {
return err
}
err = deleteStartingAt(s.db, batch, messagePrefix, uint64ToKey(uint64(count)))
if err != nil {
return err
Expand Down Expand Up @@ -1045,6 +1062,9 @@ func (s *TransactionStreamer) writeMessage(pos arbutil.MessageIndex, msg arbosty
// This also allows update of BatchGasCost in message without mistakenly erasing BlockMetadata
key = dbKey(blockMetadataInputFeedPrefix, uint64(pos))
return batch.Put(key, msg.BlockMetadata)
} else if s.trackBlockMetadataFrom != 0 && pos >= s.trackBlockMetadataFrom {
key = dbKey(missingBlockMetadataInputFeedPrefix, uint64(pos))
return batch.Put(key, nil)
}
return nil
}
Expand Down
Loading
Loading