diff --git a/CHANGELOG.md b/CHANGELOG.md index 987f55c41..04ba8ab34 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,17 +1,20 @@ -# [](https://github.com/dymensionxyz/dymint/compare/v1.1.0-rc02...v) (2024-05-01) +# [](https://github.com/dymensionxyz/dymint/compare/v1.1.0-rc02...v) (2024-05-02) ### Bug Fixes * **celestia test:** fix race in test ([#755](https://github.com/dymensionxyz/dymint/issues/755)) ([0b36781](https://github.com/dymensionxyz/dymint/commit/0b367818bf6aa8da4a4fd8e4e5c78223b60b44e0)) * **celestia:** impl retry on submit ([#748](https://github.com/dymensionxyz/dymint/issues/748)) ([61630eb](https://github.com/dymensionxyz/dymint/commit/61630eb458197abe2440a81426210000dff25d40)) +* **celestia:** use fixed delay in repeat attempts ([#753](https://github.com/dymensionxyz/dymint/issues/753)) ([53002b0](https://github.com/dymensionxyz/dymint/commit/53002b0a070743811295a98580ba038cac40cc7d)) * **da:** fixed da path seperator and encoding issue ([#731](https://github.com/dymensionxyz/dymint/issues/731)) ([3a3b219](https://github.com/dymensionxyz/dymint/commit/3a3b21932750fee7eaaa9c186f78e36e3e597746)) * **DA:** use expo backoff in retries ([#739](https://github.com/dymensionxyz/dymint/issues/739)) ([848085f](https://github.com/dymensionxyz/dymint/commit/848085f70bcaae81fb80da3ab78c4d8b399e13b1)) +* **doc:** manager cache comment ([#767](https://github.com/dymensionxyz/dymint/issues/767)) ([b88bf6e](https://github.com/dymensionxyz/dymint/commit/b88bf6e72820c944b290147724255cc8466ada50)) * **logging:** added reason for websocket closed debug msg ([#746](https://github.com/dymensionxyz/dymint/issues/746)) ([3aa7d80](https://github.com/dymensionxyz/dymint/commit/3aa7d80ace92b3b0f79e4f338f10bb94c96ab6dd)) * **logs:** make logs more readable in a couple places, fix race cond ([#749](https://github.com/dymensionxyz/dymint/issues/749)) ([f05ef39](https://github.com/dymensionxyz/dymint/commit/f05ef3957b754c05fbc90aa39eabce80bbe65933)) * **p2p:** validate block before applying and not before caching in p2p gossiping ([#723](https://github.com/dymensionxyz/dymint/issues/723)) ([98371b5](https://github.com/dymensionxyz/dymint/commit/98371b5220613e70f3274fab5593e02ba532f7db)) * **produce loop:** handle unauthenticated error in settlement layer ([#726](https://github.com/dymensionxyz/dymint/issues/726)) ([33e78d1](https://github.com/dymensionxyz/dymint/commit/33e78d116b5f14b91b8b3bda2b6cbfee9040e2d3)) * **rpc:** nil panic in rpc/json/handler.go WriteError ([#750](https://github.com/dymensionxyz/dymint/issues/750)) ([e09709b](https://github.com/dymensionxyz/dymint/commit/e09709b428a33da002defb9f13178fa19b81a69b)) +* **sync:** make sure we use a latest state index as a start point ([#760](https://github.com/dymensionxyz/dymint/issues/760)) ([43e2d96](https://github.com/dymensionxyz/dymint/commit/43e2d965f2b505751f8e5260549e909c976141ee)) diff --git a/block/manager.go b/block/manager.go index 28353c55b..21aec46c8 100644 --- a/block/manager.go +++ b/block/manager.go @@ -86,7 +86,8 @@ type Manager struct { logger types.Logger - // Cached blocks and commits for applying at future heights. Invariant: the block and commit are .Valid() (validated sigs etc) + // Cached blocks and commits for applying at future heights. The blocks may not be valid, because + // we can only do full validation in sequential order. blockCache map[uint64]CachedBlock } @@ -167,10 +168,13 @@ func (m *Manager) Start(ctx context.Context, isAggregator bool) error { } } + if !isAggregator { + go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger) + } + err := m.syncBlockManager() if err != nil { - err = fmt.Errorf("sync block manager: %w", err) - return err + return fmt.Errorf("sync block manager: %w", err) } if isAggregator { @@ -178,7 +182,6 @@ func (m *Manager) Start(ctx context.Context, isAggregator bool) error { go m.ProduceBlockLoop(ctx) go m.SubmitLoop(ctx) } else { - go uevent.MustSubscribe(ctx, m.Pubsub, "applyBlockLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger, 100) go m.RetrieveLoop(ctx) go m.SyncTargetLoop(ctx) } @@ -237,7 +240,6 @@ func (m *Manager) onNodeHealthStatus(event pubsub.Message) { // onNewGossippedBlock will take a block and apply it func (m *Manager) onNewGossipedBlock(event pubsub.Message) { m.retrieverMutex.Lock() // needed to protect blockCache access - m.logger.Debug("Received new block via gossip", "n cachedBlocks", len(m.blockCache)) eventData := event.Data().(p2p.GossipedBlock) block := eventData.Block @@ -251,9 +253,7 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) { } m.logger.Debug("caching block", "block height", block.Header.Height, "store height", m.Store.Height()) } - m.retrieverMutex.Unlock() // have to give this up as it's locked again in attempt apply, and we're not re-entrant - if block.Header.Height == nextHeight { err := m.attemptApplyCachedBlocks() if err != nil { diff --git a/block/manager_test.go b/block/manager_test.go index dd2c738e0..3f8c6fc60 100644 --- a/block/manager_test.go +++ b/block/manager_test.go @@ -95,7 +95,6 @@ func TestInitialState(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { - dalc := testutil.GetMockDALC(logger) agg, err := block.NewManager(key, conf, c.genesis, c.store, nil, proxyApp, dalc, settlementlc, nil, pubsubServer, p2pClient, logger) @@ -162,7 +161,6 @@ func TestRetrieveDaBatchesFailed(t *testing.T) { require.NoError(t, err) require.NotNil(t, manager) - t.Log(manager.LastState.SLStateIndex) daMetaData := &da.DASubmitMetaData{ Client: da.Mock, Height: 1, @@ -472,7 +470,6 @@ func TestCreateNextDABatchWithBytesLimit(t *testing.T) { } func TestDAFetch(t *testing.T) { - require := require.New(t) // Setup app app := testutil.GetAppMock(testutil.Info, testutil.Commit) diff --git a/block/retriever.go b/block/retriever.go index 29c254067..4247f08a1 100644 --- a/block/retriever.go +++ b/block/retriever.go @@ -3,7 +3,6 @@ package block import ( "context" "fmt" - "sync/atomic" "time" "github.com/avast/retry-go/v4" @@ -12,8 +11,9 @@ import ( "github.com/dymensionxyz/dymint/da" ) -// RetrieveLoop listens for new sync messages written to a ring buffer and in turn -// runs syncUntilTarget on the latest message in the ring buffer. +// RetrieveLoop listens for new target sync heights and then syncs the chain by +// fetching batches from the settlement layer and then fetching the actual blocks +// from the DA. func (m *Manager) RetrieveLoop(ctx context.Context) { m.logger.Info("started retrieve loop") syncTargetPoller := diodes.NewPoller(m.SyncTargetDiode, diodes.WithPollingContext(ctx)) @@ -24,8 +24,8 @@ func (m *Manager) RetrieveLoop(ctx context.Context) { return default: // Get only the latest sync target - syncTarget := syncTargetPoller.Next() - err := m.syncUntilTarget(*(*uint64)(syncTarget)) + targetHeight := syncTargetPoller.Next() + err := m.syncUntilTarget(*(*uint64)(targetHeight)) if err != nil { panic(fmt.Errorf("sync until target: %w", err)) } @@ -33,79 +33,62 @@ func (m *Manager) RetrieveLoop(ctx context.Context) { } } -// syncUntilTarget syncs the block until the syncTarget is reached. +// syncUntilTarget syncs blocks until the target height is reached. // It fetches the batches from the settlement, gets the DA height and gets // the actual blocks from the DA. -func (m *Manager) syncUntilTarget(syncTarget uint64) error { - currentHeight := m.Store.Height() +func (m *Manager) syncUntilTarget(targetHeight uint64) error { + for currH := m.Store.Height(); currH < targetHeight; { - if currentHeight >= syncTarget { - m.logger.Info("Already synced", "current height", currentHeight, "syncTarget", syncTarget) - return nil - } - - var stateIndex uint64 - h := m.Store.Height() - err := retry.Do( - func() error { - res, err := m.SLClient.GetHeightState(h) - if err != nil { - m.logger.Debug("sl client get height state", "error", err) - return err - } - stateIndex = res.State.StateIndex - return nil - }, - retry.Attempts(0), - retry.Delay(500*time.Millisecond), - retry.LastErrorOnly(true), - retry.DelayType(retry.FixedDelay), - ) - if err != nil { - return fmt.Errorf("get height state: %w", err) - } - m.updateStateIndex(stateIndex - 1) - m.logger.Debug("Sync until target: updated state index pre loop", "stateIndex", stateIndex, "height", h, "syncTarget", syncTarget) - - for currentHeight < syncTarget { - currStateIdx := atomic.LoadUint64(&m.LastState.SLStateIndex) + 1 - m.logger.Info("Syncing until target", "height", currentHeight, "state_index", currStateIdx, "syncTarget", syncTarget) - settlementBatch, err := m.SLClient.RetrieveBatch(currStateIdx) + // It's important that we query the state index before fetching the batch, rather + // than e.g. keep it and increment it, because we might be concurrently applying blocks + // and may require a higher index than expected. + stateIndex, err := m.queryStateIndex() if err != nil { - return err + return fmt.Errorf("query state index: %w", err) } - err = m.ProcessNextDABatch(settlementBatch.MetaData.DA) + settlementBatch, err := m.SLClient.RetrieveBatch(stateIndex) if err != nil { - return err + return fmt.Errorf("retrieve batch: %w", err) } - currentHeight = m.Store.Height() + m.logger.Info("Retrieved batch.", "state_index", stateIndex) - err = m.updateStateIndex(settlementBatch.StateIndex) + err = m.ProcessNextDABatch(settlementBatch.MetaData.DA) if err != nil { - return err + return fmt.Errorf("process next DA batch: %w", err) } + } - m.logger.Info("Synced", "current height", currentHeight, "syncTarget", syncTarget) - // check for cached blocks - err = m.attemptApplyCachedBlocks() + m.logger.Info("Synced", "store height", m.Store.Height(), "target height", targetHeight) + + err := m.attemptApplyCachedBlocks() if err != nil { - m.logger.Error("applying previous cached blocks", "err", err) + m.logger.Error("Attempt apply cached blocks.", "err", err) } return nil } -func (m *Manager) updateStateIndex(stateIndex uint64) error { - atomic.StoreUint64(&m.LastState.SLStateIndex, stateIndex) - _, err := m.Store.UpdateState(m.LastState, nil) - if err != nil { - m.logger.Error("update state", "error", err) - return err - } - return nil +// TODO: we could encapsulate the retry in the SL client +func (m *Manager) queryStateIndex() (uint64, error) { + var stateIndex uint64 + return stateIndex, retry.Do( + func() error { + res, err := m.SLClient.GetHeightState(m.Store.Height() + 1) + if err != nil { + m.logger.Debug("sl client get height state", "error", err) + return err + } + stateIndex = res.State.StateIndex + return nil + }, + retry.Attempts(0), // try forever + retry.Delay(500*time.Millisecond), + retry.LastErrorOnly(true), + retry.DelayType(retry.FixedDelay), + ) } func (m *Manager) ProcessNextDABatch(daMetaData *da.DASubmitMetaData) error { diff --git a/block/state.go b/block/state.go index 2c3d5224f..6c5770b65 100644 --- a/block/state.go +++ b/block/state.go @@ -37,7 +37,6 @@ func (e *Executor) updateState(state types.State, block *types.Block, abciRespon Version: state.Version, ChainID: state.ChainID, InitialHeight: state.InitialHeight, - SLStateIndex: state.SLStateIndex, LastBlockHeight: int64(block.Header.Height), LastBlockTime: time.Unix(0, int64(block.Header.Time)), LastBlockID: tmtypes.BlockID{ diff --git a/block/submit.go b/block/submit.go index e9042c69c..e374b997e 100644 --- a/block/submit.go +++ b/block/submit.go @@ -12,6 +12,7 @@ import ( "github.com/dymensionxyz/dymint/types" ) +// SubmitLoop submits a batch of blocks to the DA and SL layers on a time interval. func (m *Manager) SubmitLoop(ctx context.Context) { ticker := time.NewTicker(m.Conf.BatchSubmitMaxTime) defer ticker.Stop() @@ -38,26 +39,25 @@ func (m *Manager) SubmitLoop(ctx context.Context) { } } -// HandleSubmissionTrigger processes the submission trigger event. It checks if there are new blocks produced since the last submission. +// HandleSubmissionTrigger processes the sublayer submission trigger event. It checks if there are new blocks produced since the last submission. // If there are, it attempts to submit a batch of blocks. It then attempts to produce an empty block to ensure IBC messages // pass through during the batch submission process due to proofs requires for ibc messages only exist on the next block. -// Finally, it submits the next batch of blocks and updates the sync target to the height of -// the last block in the submitted batch. +// Finally, it submits the next batch of blocks and updates the sync target to the height of the last block in the submitted batch. func (m *Manager) HandleSubmissionTrigger(ctx context.Context) error { if !m.submitBatchMutex.TryLock() { return fmt.Errorf("batch submission already in process, skipping submission: %w", gerr.ErrAborted) } - defer m.submitBatchMutex.Unlock() // Ensure unlocking at the end + defer m.submitBatchMutex.Unlock() // Load current sync target and height to determine if new blocks are available for submission. - syncTarget, height := m.SyncTarget.Load(), m.Store.Height() - if height <= syncTarget { // Check if there are new blocks since last sync target. - return nil // Exit if no new blocks are produced. + if m.Store.Height() <= m.SyncTarget.Load() { + return nil // No new blocks have been produced } + // We try and produce an empty block to make sure relevant ibc messages will pass through during the batch submission: https://github.com/dymensionxyz/research/issues/173. err := m.ProduceAndGossipBlock(ctx, true) if err != nil { - m.logger.Error("produce and gossip empty block", "error", err) + m.logger.Error("Produce and gossip empty block.", "error", err) } if m.pendingBatch == nil { @@ -76,13 +76,14 @@ func (m *Manager) HandleSubmissionTrigger(ctx context.Context) error { batch: nextBatch, } } else { - m.logger.Info("pending batch already exists", "startHeight", m.pendingBatch.batch.StartHeight, "endHeight", m.pendingBatch.batch.EndHeight) + m.logger.Info("Pending batch already exists.", "startHeight", m.pendingBatch.batch.StartHeight, "endHeight", m.pendingBatch.batch.EndHeight) } syncHeight, err := m.submitPendingBatchToSL(*m.pendingBatch) if err != nil { return fmt.Errorf("submit pending batch to sl: %w", err) } + m.pendingBatch = nil // Update the syncTarget to the height of the last block in the last batch as seen by this node. diff --git a/block/synctarget.go b/block/synctarget.go index 556dafbde..471a0334a 100644 --- a/block/synctarget.go +++ b/block/synctarget.go @@ -7,8 +7,9 @@ import ( "github.com/dymensionxyz/dymint/settlement" ) -// SyncTargetLoop is responsible for getting real time updates about batches submission. -// for non aggregator: updating the sync target which will be used by retrieveLoop to sync until this target. +// SyncTargetLoop is responsible for getting real time updates about settlement batch submissions. +// For non aggregator: updating the sync target which will be used by retrieveLoop to sync until this target. +// It publishes new sync height targets which will then be synced by another process. func (m *Manager) SyncTargetLoop(ctx context.Context) { m.logger.Info("Started sync target loop") subscription, err := m.Pubsub.Subscribe(ctx, "syncTargetLoop", settlement.EventQueryNewSettlementBatchAccepted) diff --git a/da/celestia/celestia.go b/da/celestia/celestia.go index c5acfbba3..0bd4db297 100644 --- a/da/celestia/celestia.go +++ b/da/celestia/celestia.go @@ -2,6 +2,7 @@ package celestia import ( "context" + "encoding/hex" "encoding/json" "errors" "fmt" @@ -319,6 +320,7 @@ func (c *DataAvailabilityLayerClient) retrieveBatches(daMetaData *da.DASubmitMet ctx, cancel := context.WithTimeout(c.ctx, c.config.Timeout) defer cancel() + c.logger.Debug("Celestia DA getting blob", "height", daMetaData.Height, "namespace", hex.EncodeToString(daMetaData.Namespace), "commitment", hex.EncodeToString(daMetaData.Commitment)) var batches []*types.Batch blob, err := c.rpc.Get(ctx, daMetaData.Height, daMetaData.Namespace, daMetaData.Commitment) if err != nil { @@ -345,6 +347,9 @@ func (c *DataAvailabilityLayerClient) retrieveBatches(daMetaData *da.DASubmitMet if err != nil { c.logger.Error("unmarshal block", "daHeight", daMetaData.Height, "error", err) } + + c.logger.Debug("Celestia DA get blob successful", "DA height", daMetaData.Height, "lastBlockHeight", batch.EndHeight) + parsedBatch := new(types.Batch) err = parsedBatch.FromProto(&batch) if err != nil { diff --git a/da/celestia/config.go b/da/celestia/config.go index 245406311..21fd1c6b1 100644 --- a/da/celestia/config.go +++ b/da/celestia/config.go @@ -12,7 +12,7 @@ import ( ) const ( - defaultRpcRetryDelay = 30 * time.Second + defaultRpcRetryDelay = 1 * time.Second defaultRpcCheckAttempts = 10 namespaceVersion = 0 defaultGasPrices = 0.1 @@ -20,9 +20,8 @@ const ( ) var defaultSubmitBackoff = uretry.NewBackoffConfig( - uretry.WithInitialDelay(time.Second*4), - uretry.WithMaxDelay(time.Second*30), - uretry.WithGrowthFactor(1.6), + uretry.WithInitialDelay(time.Second*6), + uretry.WithMaxDelay(time.Second*6), ) // Config stores Celestia DALC configuration parameters. @@ -42,7 +41,7 @@ type Config struct { var CelestiaDefaultConfig = Config{ BaseURL: "http://127.0.0.1:26658", AppNodeURL: "", - Timeout: 30 * time.Second, + Timeout: 5 * time.Second, Fee: 0, GasLimit: 20000000, GasPrices: defaultGasPrices, diff --git a/types/serialization.go b/types/serialization.go index 79d6cddb2..814dd3ea5 100644 --- a/types/serialization.go +++ b/types/serialization.go @@ -261,7 +261,6 @@ func (s *State) ToProto() (*pb.State, error) { ChainId: s.ChainID, InitialHeight: s.InitialHeight, LastBlockHeight: s.LastBlockHeight, - SLStateIndex: s.SLStateIndex, LastBlockID: s.LastBlockID.ToProto(), LastBlockTime: s.LastBlockTime, NextValidators: nextValidators, @@ -292,7 +291,6 @@ func (s *State) FromProto(other *pb.State) error { s.LastStoreHeight = other.LastStoreHeight } s.BaseHeight = other.BaseHeight - s.SLStateIndex = other.SLStateIndex lastBlockID, err := types.BlockIDFromProto(&other.LastBlockID) if err != nil { return err diff --git a/types/state.go b/types/state.go index 7646e175c..4715bf034 100644 --- a/types/state.go +++ b/types/state.go @@ -37,9 +37,6 @@ type State struct { LastBlockID types.BlockID LastBlockTime time.Time - // SLStateIndex identifies the Settlement Layer state index we're synced with - SLStateIndex uint64 - // In the MVP implementation, there will be only one Validator NextValidators *types.ValidatorSet Validators *types.ValidatorSet diff --git a/utils/event/funcs.go b/utils/event/funcs.go index 11d5ceb12..696af1226 100644 --- a/utils/event/funcs.go +++ b/utils/event/funcs.go @@ -20,9 +20,8 @@ func MustSubscribe( eventQuery pubsub.Query, callback func(event pubsub.Message), logger types.Logger, - outCapacity ...int, ) { - subscription, err := pubsubServer.Subscribe(ctx, clientID, eventQuery, outCapacity...) + subscription, err := pubsubServer.SubscribeUnbuffered(ctx, clientID, eventQuery) if err != nil { logger.Error("subscribe to events") panic(err) diff --git a/utils/retry/backoff_test.go b/utils/retry/backoff_test.go index 37c732f36..8a03d0a27 100644 --- a/utils/retry/backoff_test.go +++ b/utils/retry/backoff_test.go @@ -34,4 +34,11 @@ func TestBackoff(t *testing.T) { last = d } }) + t.Run("maximum", func(t *testing.T) { + d := time.Second + b := NewBackoffConfig(WithInitialDelay(time.Second), WithMaxDelay(d)).Backoff() + for range 10 { + require.Equal(t, time.Second, b.Delay()) + } + }) }