Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NIT-2485] snap-sync v0.3: fetch snap sync parameters on the fly #2318

Open
wants to merge 7 commits into
base: snap_sync_0.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion arbnode/delayed_seq_reorg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestSequencerReorgFromDelayed(t *testing.T) {
defer cancel()

exec, streamer, db, _ := NewTransactionStreamerForTest(t, common.Address{})
tracker, err := NewInboxTracker(db, streamer, nil, DefaultSnapSyncConfig)
tracker, err := NewInboxTracker(db, streamer, nil)
Require(t, err)

err = streamer.Start(ctx)
Expand Down
40 changes: 38 additions & 2 deletions arbnode/inbox_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type InboxReaderConfig struct {
TargetMessagesRead uint64 `koanf:"target-messages-read" reload:"hot"`
MaxBlocksToRead uint64 `koanf:"max-blocks-to-read" reload:"hot"`
ReadMode string `koanf:"read-mode" reload:"hot"`
EnableSnapSync bool `koanf:"enable-snap-sync"`
// SnapSyncTest is only used for testing purposes, these should not be configured in production.
SnapSyncTest SnapSyncConfig
}

type InboxReaderConfigFetcher func() *InboxReaderConfig
Expand All @@ -57,6 +60,7 @@ func InboxReaderConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Uint64(prefix+".target-messages-read", DefaultInboxReaderConfig.TargetMessagesRead, "if adjust-blocks-to-read is enabled, the target number of messages to read at once")
f.Uint64(prefix+".max-blocks-to-read", DefaultInboxReaderConfig.MaxBlocksToRead, "if adjust-blocks-to-read is enabled, the maximum number of blocks to read at once")
f.String(prefix+".read-mode", DefaultInboxReaderConfig.ReadMode, "mode to only read latest or safe or finalized L1 blocks. Enabling safe or finalized disables feed input and output. Defaults to latest. Takes string input, valid strings- latest, safe, finalized")
f.Bool(prefix+".enable-snap-sync", false, "enable snap sync")
}

var DefaultInboxReaderConfig = InboxReaderConfig{
Expand Down Expand Up @@ -96,13 +100,14 @@ type InboxReader struct {
caughtUpChan chan struct{}
client *ethclient.Client
l1Reader *headerreader.HeaderReader
rollupAddress common.Address

// Atomic
lastSeenBatchCount atomic.Uint64
lastReadBatchCount atomic.Uint64
}

func NewInboxReader(tracker *InboxTracker, client *ethclient.Client, l1Reader *headerreader.HeaderReader, firstMessageBlock *big.Int, delayedBridge *DelayedBridge, sequencerInbox *SequencerInbox, config InboxReaderConfigFetcher) (*InboxReader, error) {
func NewInboxReader(tracker *InboxTracker, client *ethclient.Client, l1Reader *headerreader.HeaderReader, firstMessageBlock *big.Int, rollupAddress common.Address, delayedBridge *DelayedBridge, sequencerInbox *SequencerInbox, config InboxReaderConfigFetcher) (*InboxReader, error) {
err := config().Validate()
if err != nil {
return nil, err
Expand All @@ -116,12 +121,37 @@ func NewInboxReader(tracker *InboxTracker, client *ethclient.Client, l1Reader *h
firstMessageBlock: firstMessageBlock,
caughtUpChan: make(chan struct{}),
config: config,
rollupAddress: rollupAddress,
}, nil
}

func (r *InboxReader) Start(ctxIn context.Context) error {
r.StopWaiter.Start(ctxIn, r)
hadError := false
if r.config().EnableSnapSync {
batchCount, err := r.tracker.GetBatchCount()
if err != nil {
return err
}
if batchCount == 0 {
snapSyncConfig := r.fetchSnapSyncParameters()
r.tracker.SetSnapSyncParameters(snapSyncConfig)
r.tracker.txStreamer.SetSnapSyncParameters(snapSyncConfig)
snapSyncBatchCount := snapSyncConfig.BatchCount
// Find the first block containing the batch count.
// Subtract 1 to get the block before the needed batch count,
// this is done to fetch previous batch metadata needed for snap sync.
if snapSyncBatchCount > 0 {
snapSyncBatchCount--
}
block, err := FindBlockContainingBatchCount(ctxIn, r.rollupAddress, r.client, snapSyncConfig.ParentChainAssertionBlock, snapSyncBatchCount)
if err != nil {
return err
}
r.firstMessageBlock.SetUint64(block)
}

}
r.CallIteratively(func(ctx context.Context) time.Duration {
err := r.run(ctx, hadError)
if err != nil && !errors.Is(err, context.Canceled) && !strings.Contains(err.Error(), "header not found") {
Expand All @@ -140,7 +170,7 @@ func (r *InboxReader) Start(ctxIn context.Context) error {
return err
}
if batchCount > 0 {
if r.tracker.snapSyncConfig.Enabled {
if r.config().EnableSnapSync {
break
}
// Validate the init message matches our L2 blockchain
Expand Down Expand Up @@ -177,6 +207,12 @@ func (r *InboxReader) Start(ctxIn context.Context) error {
return nil
}

func (r *InboxReader) fetchSnapSyncParameters() SnapSyncConfig {
// In the future, we will implement a way to fetch this is from other nodes,
// but for now we will just use the test config
return r.config().SnapSyncTest
}

// assumes l1block is recent so we could do a simple-search from the end
func (r *InboxReader) recentParentChainBlockToMsg(ctx context.Context, parentChainBlock uint64) (arbutil.MessageIndex, error) {
batch, err := r.tracker.GetBatchCount()
Expand Down
2 changes: 1 addition & 1 deletion arbnode/inbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func NewTransactionStreamerForTest(t *testing.T, ownerAddress common.Address) (*
Fail(t, err)
}
execSeq := &execClientWrapper{execEngine, t}
inbox, err := NewTransactionStreamer(arbDb, bc.Config(), execSeq, nil, make(chan error, 1), transactionStreamerConfigFetcher, &DefaultSnapSyncConfig)
inbox, err := NewTransactionStreamer(arbDb, bc.Config(), execSeq, nil, make(chan error, 1), transactionStreamerConfigFetcher)
if err != nil {
Fail(t, err)
}
Expand Down
15 changes: 9 additions & 6 deletions arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,12 @@ type InboxTracker struct {
batchMeta *containers.LruCache[uint64, BatchMetadata]
}

func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, dapReaders []daprovider.Reader, snapSyncConfig SnapSyncConfig) (*InboxTracker, error) {
func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, dapReaders []daprovider.Reader) (*InboxTracker, error) {
tracker := &InboxTracker{
db: db,
txStreamer: txStreamer,
dapReaders: dapReaders,
batchMeta: containers.NewLruCache[uint64, BatchMetadata](1000),
snapSyncConfig: snapSyncConfig,
db: db,
txStreamer: txStreamer,
dapReaders: dapReaders,
batchMeta: containers.NewLruCache[uint64, BatchMetadata](1000),
}
return tracker, nil
}
Expand Down Expand Up @@ -226,6 +225,10 @@ func (t *InboxTracker) GetBatchCount() (uint64, error) {
return count, nil
}

func (t *InboxTracker) SetSnapSyncParameters(config SnapSyncConfig) {
t.snapSyncConfig = config
}

// err will return unexpected/internal errors
// bool will be false if batch not found (meaning, block not yet posted on a batch)
func (t *InboxTracker) FindInboxBatchContainingMessage(pos arbutil.MessageIndex) (uint64, bool, error) {
Expand Down
35 changes: 4 additions & 31 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ type Config struct {
TransactionStreamer TransactionStreamerConfig `koanf:"transaction-streamer" reload:"hot"`
Maintenance MaintenanceConfig `koanf:"maintenance" reload:"hot"`
ResourceMgmt resourcemanager.Config `koanf:"resource-mgmt" reload:"hot"`
// SnapSyncConfig is only used for testing purposes, these should not be configured in production.
SnapSyncTest SnapSyncConfig
}

func (c *Config) Validate() error {
Expand Down Expand Up @@ -178,7 +176,6 @@ var ConfigDefault = Config{
TransactionStreamer: DefaultTransactionStreamerConfig,
ResourceMgmt: resourcemanager.DefaultConfig,
Maintenance: DefaultMaintenanceConfig,
SnapSyncTest: DefaultSnapSyncConfig,
}

func ConfigDefaultL1Test() *Config {
Expand Down Expand Up @@ -286,15 +283,6 @@ type SnapSyncConfig struct {
ParentChainAssertionBlock uint64
}

var DefaultSnapSyncConfig = SnapSyncConfig{
Enabled: false,
PrevBatchMessageCount: 0,
PrevDelayedRead: 0,
BatchCount: 0,
DelayedCount: 0,
ParentChainAssertionBlock: 0,
}

type ConfigFetcher interface {
Get() *Config
Start(context.Context)
Expand Down Expand Up @@ -455,7 +443,7 @@ func createNodeImpl(
}

transactionStreamerConfigFetcher := func() *TransactionStreamerConfig { return &configFetcher.Get().TransactionStreamer }
txStreamer, err := NewTransactionStreamer(arbDb, l2Config, exec, broadcastServer, fatalErrChan, transactionStreamerConfigFetcher, &configFetcher.Get().SnapSyncTest)
txStreamer, err := NewTransactionStreamer(arbDb, l2Config, exec, broadcastServer, fatalErrChan, transactionStreamerConfigFetcher)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -587,26 +575,11 @@ func createNodeImpl(
if blobReader != nil {
dapReaders = append(dapReaders, daprovider.NewReaderForBlobReader(blobReader))
}
inboxTracker, err := NewInboxTracker(arbDb, txStreamer, dapReaders, config.SnapSyncTest)
inboxTracker, err := NewInboxTracker(arbDb, txStreamer, dapReaders)
if err != nil {
return nil, err
}
firstMessageBlock := new(big.Int).SetUint64(deployInfo.DeployedAt)
if config.SnapSyncTest.Enabled {
batchCount := config.SnapSyncTest.BatchCount
// Find the first block containing the batch count.
// Subtract 1 to get the block before the needed batch count,
// this is done to fetch previous batch metadata needed for snap sync.
if batchCount > 0 {
batchCount--
}
block, err := FindBlockContainingBatchCount(ctx, deployInfo.Bridge, l1client, config.SnapSyncTest.ParentChainAssertionBlock, batchCount)
if err != nil {
return nil, err
}
firstMessageBlock.SetUint64(block)
}
inboxReader, err := NewInboxReader(inboxTracker, l1client, l1Reader, firstMessageBlock, delayedBridge, sequencerInbox, func() *InboxReaderConfig { return &configFetcher.Get().InboxReader })
inboxReader, err := NewInboxReader(inboxTracker, l1client, l1Reader, new(big.Int).SetUint64(deployInfo.DeployedAt), deployInfo.Rollup, delayedBridge, sequencerInbox, func() *InboxReaderConfig { return &configFetcher.Get().InboxReader })
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -787,7 +760,7 @@ func createNodeImpl(
}, nil
}

func FindBlockContainingBatchCount(ctx context.Context, bridgeAddress common.Address, l1Client arbutil.L1Interface, parentChainAssertionBlock uint64, batchCount uint64) (uint64, error) {
func FindBlockContainingBatchCount(ctx context.Context, bridgeAddress common.Address, l1Client *ethclient.Client, parentChainAssertionBlock uint64, batchCount uint64) (uint64, error) {
bridge, err := bridgegen.NewIBridge(bridgeAddress, l1Client)
if err != nil {
return 0, err
Expand Down
8 changes: 5 additions & 3 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type TransactionStreamer struct {
db ethdb.Database
fatalErrChan chan<- error
config TransactionStreamerConfigFetcher
snapSyncConfig *SnapSyncConfig
snapSyncConfig SnapSyncConfig

insertionMutex sync.Mutex // cannot be acquired while reorgMutex is held
reorgMutex sync.RWMutex
Expand Down Expand Up @@ -103,7 +103,6 @@ func NewTransactionStreamer(
broadcastServer *broadcaster.Broadcaster,
fatalErrChan chan<- error,
config TransactionStreamerConfigFetcher,
snapSyncConfig *SnapSyncConfig,
) (*TransactionStreamer, error) {
streamer := &TransactionStreamer{
exec: exec,
Expand All @@ -113,7 +112,6 @@ func NewTransactionStreamer(
broadcastServer: broadcastServer,
fatalErrChan: fatalErrChan,
config: config,
snapSyncConfig: snapSyncConfig,
}
err := streamer.cleanupInconsistentState()
if err != nil {
Expand Down Expand Up @@ -697,6 +695,10 @@ func (s *TransactionStreamer) AddMessagesAndEndBatch(pos arbutil.MessageIndex, m
return s.addMessagesAndEndBatchImpl(pos, messagesAreConfirmed, messagesWithBlockHash, batch)
}

func (s *TransactionStreamer) SetSnapSyncParameters(config SnapSyncConfig) {
s.snapSyncConfig = config
}

func (s *TransactionStreamer) getPrevPrevDelayedRead(pos arbutil.MessageIndex) (uint64, error) {
if s.snapSyncConfig.Enabled && uint64(pos) == s.snapSyncConfig.PrevBatchMessageCount {
return s.snapSyncConfig.PrevDelayedRead, nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/pruning/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func findImportantRoots(ctx context.Context, chainDb ethdb.Database, stack *node
return nil, fmt.Errorf("failed to get finalized block: %w", err)
}
l1BlockNum := l1Block.NumberU64()
tracker, err := arbnode.NewInboxTracker(arbDb, nil, nil, arbnode.DefaultSnapSyncConfig)
tracker, err := arbnode.NewInboxTracker(arbDb, nil, nil)
if err != nil {
return nil, err
}
Expand Down
13 changes: 7 additions & 6 deletions system_tests/snap_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestSnapSync(t *testing.T) {
t.Error("Block hash mismatch")
}
// This to ensure that the node did a snap sync and did not sync the batch before the snap sync batch.
_, err = nodeC.ConsensusNode.InboxTracker.GetBatchMetadata(nodeConfig.SnapSyncTest.BatchCount - 3)
_, err = nodeC.ConsensusNode.InboxTracker.GetBatchMetadata(nodeConfig.InboxReader.SnapSyncTest.BatchCount - 3)
if err == nil {
t.Error("Batch metadata should not be present for the batch before the snap sync batch")
}
Expand Down Expand Up @@ -191,10 +191,11 @@ func createNodeConfigWithSnapSync(t *testing.T, builder *NodeBuilder) *arbnode.C
Require(t, err)
// Create a config with snap sync enabled and same database directory as the 2nd node
nodeConfig := builder.nodeConfig
nodeConfig.SnapSyncTest.Enabled = true
nodeConfig.SnapSyncTest.BatchCount = batchCount
nodeConfig.SnapSyncTest.DelayedCount = prevBatchMetaData.DelayedMessageCount - 1
nodeConfig.SnapSyncTest.PrevDelayedRead = prevMessage.DelayedMessagesRead
nodeConfig.SnapSyncTest.PrevBatchMessageCount = uint64(prevBatchMetaData.MessageCount)
nodeConfig.InboxReader.EnableSnapSync = true
nodeConfig.InboxReader.SnapSyncTest.Enabled = true
nodeConfig.InboxReader.SnapSyncTest.BatchCount = batchCount
nodeConfig.InboxReader.SnapSyncTest.DelayedCount = prevBatchMetaData.DelayedMessageCount - 1
nodeConfig.InboxReader.SnapSyncTest.PrevDelayedRead = prevMessage.DelayedMessagesRead
nodeConfig.InboxReader.SnapSyncTest.PrevBatchMessageCount = uint64(prevBatchMetaData.MessageCount)
return nodeConfig
}
Loading