Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bug): sync from da and p2p when starting a node #763

Merged
merged 32 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
7b70862
finish impl but without fixing tests, and sync call is in wrong place
danwt May 1, 2024
8b0a032
move the call
danwt May 1, 2024
de0ea98
add a sanity check log
danwt May 1, 2024
67d30ac
do not sync celestia on boot
danwt May 1, 2024
1487d79
remove process nexct batch
danwt May 1, 2024
3db98d0
keep retrying
danwt May 1, 2024
66092f9
comment out celestia
danwt May 1, 2024
8725393
revert all temp changes
danwt May 1, 2024
bac7765
fixing da syncing
srene May 1, 2024
656b017
readd availability check
srene May 2, 2024
a8fdcd1
changing default parameters + logging
srene May 2, 2024
5208955
setting channel size to gossip cache value
srene May 2, 2024
e03ae2c
Merge 5208955ccc4613900609cbab080899612941e94e into 43e2d965f2b505751…
srene May 2, 2024
f5b7981
Update CHANGELOG.md [skip ci]
invalid-email-address May 2, 2024
f6adc27
using unbuffered subscriptions
srene May 2, 2024
4c4442d
removing unused func
srene May 2, 2024
78780ed
refac/simplify sync until target
danwt May 2, 2024
132fc6e
make sync until target more readable
danwt May 2, 2024
02a41ba
add comment for retriever
danwt May 2, 2024
b600148
remove state index
danwt May 2, 2024
b3ff35e
rebuild proto
danwt May 2, 2024
f2df961
docstrings
danwt May 2, 2024
b343287
docstrings
danwt May 2, 2024
53002b0
fix(celestia): use fixed delay in repeat attempts (#753)
danwt May 2, 2024
a25d74b
readding mutex
srene May 2, 2024
09e7199
Merge a25d74b2a46a53491047112c0f45ab3c66370f17 into 53002b0a070743811…
srene May 2, 2024
4cc17dd
Update CHANGELOG.md [skip ci]
invalid-email-address May 2, 2024
c7f6e7a
fix logging
srene May 2, 2024
b88bf6e
fix(doc): manager cache comment (#767)
danwt May 2, 2024
5cee763
add state index back to proto
danwt May 2, 2024
d101799
Merge 5cee7634fbd013ffcfc4ed8af0d3f8174f390b51 into b88bf6e72820c944b…
srene May 2, 2024
91bb364
Update CHANGELOG.md [skip ci]
invalid-email-address May 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
# [](https://github.com/dymensionxyz/dymint/compare/v1.1.0-rc02...v) (2024-05-01)
# [](https://github.com/dymensionxyz/dymint/compare/v1.1.0-rc02...v) (2024-05-02)


### Bug Fixes

* **celestia test:** fix race in test ([#755](https://github.com/dymensionxyz/dymint/issues/755)) ([0b36781](https://github.com/dymensionxyz/dymint/commit/0b367818bf6aa8da4a4fd8e4e5c78223b60b44e0))
* **celestia:** impl retry on submit ([#748](https://github.com/dymensionxyz/dymint/issues/748)) ([61630eb](https://github.com/dymensionxyz/dymint/commit/61630eb458197abe2440a81426210000dff25d40))
* **celestia:** use fixed delay in repeat attempts ([#753](https://github.com/dymensionxyz/dymint/issues/753)) ([53002b0](https://github.com/dymensionxyz/dymint/commit/53002b0a070743811295a98580ba038cac40cc7d))
* **da:** fixed da path seperator and encoding issue ([#731](https://github.com/dymensionxyz/dymint/issues/731)) ([3a3b219](https://github.com/dymensionxyz/dymint/commit/3a3b21932750fee7eaaa9c186f78e36e3e597746))
* **DA:** use expo backoff in retries ([#739](https://github.com/dymensionxyz/dymint/issues/739)) ([848085f](https://github.com/dymensionxyz/dymint/commit/848085f70bcaae81fb80da3ab78c4d8b399e13b1))
* **logging:** added reason for websocket closed debug msg ([#746](https://github.com/dymensionxyz/dymint/issues/746)) ([3aa7d80](https://github.com/dymensionxyz/dymint/commit/3aa7d80ace92b3b0f79e4f338f10bb94c96ab6dd))
* **logs:** make logs more readable in a couple places, fix race cond ([#749](https://github.com/dymensionxyz/dymint/issues/749)) ([f05ef39](https://github.com/dymensionxyz/dymint/commit/f05ef3957b754c05fbc90aa39eabce80bbe65933))
* **p2p:** validate block before applying and not before caching in p2p gossiping ([#723](https://github.com/dymensionxyz/dymint/issues/723)) ([98371b5](https://github.com/dymensionxyz/dymint/commit/98371b5220613e70f3274fab5593e02ba532f7db))
* **produce loop:** handle unauthenticated error in settlement layer ([#726](https://github.com/dymensionxyz/dymint/issues/726)) ([33e78d1](https://github.com/dymensionxyz/dymint/commit/33e78d116b5f14b91b8b3bda2b6cbfee9040e2d3))
* **rpc:** nil panic in rpc/json/handler.go WriteError ([#750](https://github.com/dymensionxyz/dymint/issues/750)) ([e09709b](https://github.com/dymensionxyz/dymint/commit/e09709b428a33da002defb9f13178fa19b81a69b))
* **sync:** make sure we use a latest state index as a start point ([#760](https://github.com/dymensionxyz/dymint/issues/760)) ([43e2d96](https://github.com/dymensionxyz/dymint/commit/43e2d965f2b505751f8e5260549e909c976141ee))



Expand Down
13 changes: 6 additions & 7 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,18 +167,20 @@ func (m *Manager) Start(ctx context.Context, isAggregator bool) error {
}
}

if !isAggregator {
go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger)
Dismissed Show dismissed Hide dismissed
}

err := m.syncBlockManager()
if err != nil {
err = fmt.Errorf("sync block manager: %w", err)
return err
return fmt.Errorf("sync block manager: %w", err)
}

if isAggregator {
go uevent.MustSubscribe(ctx, m.Pubsub, "nodeHealth", events.QueryHealthStatus, m.onNodeHealthStatus, m.logger)
go m.ProduceBlockLoop(ctx)
go m.SubmitLoop(ctx)
} else {
go uevent.MustSubscribe(ctx, m.Pubsub, "applyBlockLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger, 100)
go m.RetrieveLoop(ctx)
go m.SyncTargetLoop(ctx)
}
Expand Down Expand Up @@ -237,8 +239,7 @@ func (m *Manager) onNodeHealthStatus(event pubsub.Message) {
// onNewGossippedBlock will take a block and apply it
func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
m.retrieverMutex.Lock() // needed to protect blockCache access
danwt marked this conversation as resolved.
Show resolved Hide resolved

m.logger.Debug("Received new block via gossip", "n cachedBlocks", len(m.blockCache))
m.logger.Debug("Received new block event", "eventData", event.Data(), "cachedBlocks", len(m.blockCache))
danwt marked this conversation as resolved.
Show resolved Hide resolved
eventData := event.Data().(p2p.GossipedBlock)
block := eventData.Block
commit := eventData.Commit
Expand All @@ -251,9 +252,7 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
}
m.logger.Debug("caching block", "block height", block.Header.Height, "store height", m.Store.Height())
}

m.retrieverMutex.Unlock() // have to give this up as it's locked again in attempt apply, and we're not re-entrant
danwt marked this conversation as resolved.
Show resolved Hide resolved

if block.Header.Height == nextHeight {
err := m.attemptApplyCachedBlocks()
if err != nil {
Expand Down
3 changes: 0 additions & 3 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ func TestInitialState(t *testing.T) {

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {

dalc := testutil.GetMockDALC(logger)
agg, err := block.NewManager(key, conf, c.genesis, c.store, nil, proxyApp, dalc, settlementlc,
nil, pubsubServer, p2pClient, logger)
Expand Down Expand Up @@ -162,7 +161,6 @@ func TestRetrieveDaBatchesFailed(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, manager)

t.Log(manager.LastState.SLStateIndex)
daMetaData := &da.DASubmitMetaData{
Client: da.Mock,
Height: 1,
Expand Down Expand Up @@ -472,7 +470,6 @@ func TestCreateNextDABatchWithBytesLimit(t *testing.T) {
}

func TestDAFetch(t *testing.T) {

require := require.New(t)
// Setup app
app := testutil.GetAppMock(testutil.Info, testutil.Commit)
Expand Down
99 changes: 41 additions & 58 deletions block/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package block
import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/avast/retry-go/v4"
Expand All @@ -12,8 +11,9 @@ import (
"github.com/dymensionxyz/dymint/da"
)

// RetrieveLoop listens for new sync messages written to a ring buffer and in turn
// runs syncUntilTarget on the latest message in the ring buffer.
// RetrieveLoop listens for new target sync heights and then syncs the chain by
// fetching batches from the settlement layer and then fetching the actual blocks
// from the DA.
func (m *Manager) RetrieveLoop(ctx context.Context) {
m.logger.Info("started retrieve loop")
syncTargetPoller := diodes.NewPoller(m.SyncTargetDiode, diodes.WithPollingContext(ctx))
Expand All @@ -24,88 +24,71 @@ func (m *Manager) RetrieveLoop(ctx context.Context) {
return
default:
// Get only the latest sync target
syncTarget := syncTargetPoller.Next()
err := m.syncUntilTarget(*(*uint64)(syncTarget))
targetHeight := syncTargetPoller.Next()
err := m.syncUntilTarget(*(*uint64)(targetHeight))
if err != nil {
panic(fmt.Errorf("sync until target: %w", err))
}
}
}
}

// syncUntilTarget syncs the block until the syncTarget is reached.
// syncUntilTarget syncs blocks until the target height is reached.
// It fetches the batches from the settlement, gets the DA height and gets
// the actual blocks from the DA.
func (m *Manager) syncUntilTarget(syncTarget uint64) error {
currentHeight := m.Store.Height()
func (m *Manager) syncUntilTarget(targetHeight uint64) error {
for currH := m.Store.Height(); currH < targetHeight; {

if currentHeight >= syncTarget {
m.logger.Info("Already synced", "current height", currentHeight, "syncTarget", syncTarget)
return nil
}

var stateIndex uint64
h := m.Store.Height()
err := retry.Do(
func() error {
res, err := m.SLClient.GetHeightState(h)
if err != nil {
m.logger.Debug("sl client get height state", "error", err)
return err
}
stateIndex = res.State.StateIndex
return nil
},
retry.Attempts(0),
retry.Delay(500*time.Millisecond),
retry.LastErrorOnly(true),
retry.DelayType(retry.FixedDelay),
)
if err != nil {
return fmt.Errorf("get height state: %w", err)
}
m.updateStateIndex(stateIndex - 1)
m.logger.Debug("Sync until target: updated state index pre loop", "stateIndex", stateIndex, "height", h, "syncTarget", syncTarget)

for currentHeight < syncTarget {
currStateIdx := atomic.LoadUint64(&m.LastState.SLStateIndex) + 1
m.logger.Info("Syncing until target", "height", currentHeight, "state_index", currStateIdx, "syncTarget", syncTarget)
settlementBatch, err := m.SLClient.RetrieveBatch(currStateIdx)
// It's important that we query the state index before fetching the batch, rather
// than e.g. keep it and increment it, because we might be concurrently applying blocks
// and may require a higher index than expected.
stateIndex, err := m.queryStateIndex()
if err != nil {
return err
return fmt.Errorf("query state index: %w", err)
}

err = m.ProcessNextDABatch(settlementBatch.MetaData.DA)
settlementBatch, err := m.SLClient.RetrieveBatch(stateIndex)
if err != nil {
return err
return fmt.Errorf("retrieve batch: %w", err)
}

currentHeight = m.Store.Height()
m.logger.Info("Retrieved batch.", "state_index", stateIndex)

err = m.updateStateIndex(settlementBatch.StateIndex)
err = m.ProcessNextDABatch(settlementBatch.MetaData.DA)
if err != nil {
return err
return fmt.Errorf("process next DA batch: %w", err)
}

}
m.logger.Info("Synced", "current height", currentHeight, "syncTarget", syncTarget)

// check for cached blocks
err = m.attemptApplyCachedBlocks()
m.logger.Info("Synced", "store height", m.Store.Height(), "target height", targetHeight)

err := m.attemptApplyCachedBlocks()
if err != nil {
m.logger.Error("applying previous cached blocks", "err", err)
m.logger.Error("Attempt apply cached blocks.", "err", err)
}

return nil
}

func (m *Manager) updateStateIndex(stateIndex uint64) error {
atomic.StoreUint64(&m.LastState.SLStateIndex, stateIndex)
_, err := m.Store.UpdateState(m.LastState, nil)
if err != nil {
m.logger.Error("update state", "error", err)
return err
}
return nil
// TODO: we could encapsulate the retry in the SL client
func (m *Manager) queryStateIndex() (uint64, error) {
var stateIndex uint64
return stateIndex, retry.Do(
func() error {
res, err := m.SLClient.GetHeightState(m.Store.Height() + 1)
if err != nil {
m.logger.Debug("sl client get height state", "error", err)
return err
}
stateIndex = res.State.StateIndex
return nil
},
retry.Attempts(0), // try forever
retry.Delay(500*time.Millisecond),
retry.LastErrorOnly(true),
retry.DelayType(retry.FixedDelay),
)
}

func (m *Manager) ProcessNextDABatch(daMetaData *da.DASubmitMetaData) error {
Expand Down
1 change: 0 additions & 1 deletion block/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ func (e *Executor) updateState(state types.State, block *types.Block, abciRespon
Version: state.Version,
ChainID: state.ChainID,
InitialHeight: state.InitialHeight,
SLStateIndex: state.SLStateIndex,
LastBlockHeight: int64(block.Header.Height),
LastBlockTime: time.Unix(0, int64(block.Header.Time)),
LastBlockID: tmtypes.BlockID{
Expand Down
19 changes: 10 additions & 9 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/dymensionxyz/dymint/types"
)

// SubmitLoop submits a batch of blocks to the DA and SL layers on a time interval.
func (m *Manager) SubmitLoop(ctx context.Context) {
ticker := time.NewTicker(m.Conf.BatchSubmitMaxTime)
defer ticker.Stop()
Expand All @@ -38,26 +39,25 @@ func (m *Manager) SubmitLoop(ctx context.Context) {
}
}

// HandleSubmissionTrigger processes the submission trigger event. It checks if there are new blocks produced since the last submission.
// HandleSubmissionTrigger processes the sublayer submission trigger event. It checks if there are new blocks produced since the last submission.
// If there are, it attempts to submit a batch of blocks. It then attempts to produce an empty block to ensure IBC messages
// pass through during the batch submission process due to proofs requires for ibc messages only exist on the next block.
// Finally, it submits the next batch of blocks and updates the sync target to the height of
// the last block in the submitted batch.
// Finally, it submits the next batch of blocks and updates the sync target to the height of the last block in the submitted batch.
func (m *Manager) HandleSubmissionTrigger(ctx context.Context) error {
if !m.submitBatchMutex.TryLock() {
return fmt.Errorf("batch submission already in process, skipping submission: %w", gerr.ErrAborted)
}
defer m.submitBatchMutex.Unlock() // Ensure unlocking at the end
defer m.submitBatchMutex.Unlock()

// Load current sync target and height to determine if new blocks are available for submission.
syncTarget, height := m.SyncTarget.Load(), m.Store.Height()
if height <= syncTarget { // Check if there are new blocks since last sync target.
return nil // Exit if no new blocks are produced.
if m.Store.Height() <= m.SyncTarget.Load() {
return nil // No new blocks have been produced
}

// We try and produce an empty block to make sure relevant ibc messages will pass through during the batch submission: https://github.com/dymensionxyz/research/issues/173.
err := m.ProduceAndGossipBlock(ctx, true)
if err != nil {
m.logger.Error("produce and gossip empty block", "error", err)
m.logger.Error("Produce and gossip empty block.", "error", err)
}

if m.pendingBatch == nil {
Expand All @@ -76,13 +76,14 @@ func (m *Manager) HandleSubmissionTrigger(ctx context.Context) error {
batch: nextBatch,
}
} else {
m.logger.Info("pending batch already exists", "startHeight", m.pendingBatch.batch.StartHeight, "endHeight", m.pendingBatch.batch.EndHeight)
m.logger.Info("Pending batch already exists.", "startHeight", m.pendingBatch.batch.StartHeight, "endHeight", m.pendingBatch.batch.EndHeight)
}

syncHeight, err := m.submitPendingBatchToSL(*m.pendingBatch)
if err != nil {
return fmt.Errorf("submit pending batch to sl: %w", err)
}

m.pendingBatch = nil

// Update the syncTarget to the height of the last block in the last batch as seen by this node.
Expand Down
5 changes: 3 additions & 2 deletions block/synctarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (
"github.com/dymensionxyz/dymint/settlement"
)

// SyncTargetLoop is responsible for getting real time updates about batches submission.
// for non aggregator: updating the sync target which will be used by retrieveLoop to sync until this target.
// SyncTargetLoop is responsible for getting real time updates about settlement batch submissions.
// For non aggregator: updating the sync target which will be used by retrieveLoop to sync until this target.
// It publishes new sync height targets which will then be synced by another process.
func (m *Manager) SyncTargetLoop(ctx context.Context) {
m.logger.Info("Started sync target loop")
subscription, err := m.Pubsub.Subscribe(ctx, "syncTargetLoop", settlement.EventQueryNewSettlementBatchAccepted)
Expand Down
5 changes: 5 additions & 0 deletions da/celestia/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package celestia

import (
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -319,6 +320,7 @@ func (c *DataAvailabilityLayerClient) retrieveBatches(daMetaData *da.DASubmitMet
ctx, cancel := context.WithTimeout(c.ctx, c.config.Timeout)
defer cancel()

c.logger.Debug("Celestia DA getting blob", "height", daMetaData.Height, "namespace", hex.EncodeToString(daMetaData.Namespace), "commitment", hex.EncodeToString(daMetaData.Commitment))
var batches []*types.Batch
blob, err := c.rpc.Get(ctx, daMetaData.Height, daMetaData.Namespace, daMetaData.Commitment)
if err != nil {
Expand All @@ -345,6 +347,9 @@ func (c *DataAvailabilityLayerClient) retrieveBatches(daMetaData *da.DASubmitMet
if err != nil {
c.logger.Error("unmarshal block", "daHeight", daMetaData.Height, "error", err)
}

c.logger.Debug("Celestia DA get blob successful", "DA height", daMetaData.Height, "lastBlockHeight", batch.EndHeight)

parsedBatch := new(types.Batch)
err = parsedBatch.FromProto(&batch)
if err != nil {
Expand Down
9 changes: 4 additions & 5 deletions da/celestia/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,16 @@ import (
)

const (
defaultRpcRetryDelay = 30 * time.Second
defaultRpcRetryDelay = 1 * time.Second
danwt marked this conversation as resolved.
Show resolved Hide resolved
defaultRpcCheckAttempts = 10
namespaceVersion = 0
defaultGasPrices = 0.1
defaultGasAdjustment float64 = 1.3
)

var defaultSubmitBackoff = uretry.NewBackoffConfig(
uretry.WithInitialDelay(time.Second*4),
uretry.WithMaxDelay(time.Second*30),
uretry.WithGrowthFactor(1.6),
uretry.WithInitialDelay(time.Second*6),
uretry.WithMaxDelay(time.Second*6),
)

// Config stores Celestia DALC configuration parameters.
Expand All @@ -42,7 +41,7 @@ type Config struct {
var CelestiaDefaultConfig = Config{
BaseURL: "http://127.0.0.1:26658",
AppNodeURL: "",
Timeout: 30 * time.Second,
Timeout: 5 * time.Second,
danwt marked this conversation as resolved.
Show resolved Hide resolved
Fee: 0,
GasLimit: 20000000,
GasPrices: defaultGasPrices,
Expand Down
2 changes: 0 additions & 2 deletions proto/types/dymint/state.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ message State {
tendermint.types.BlockID last_block_id = 5 [(gogoproto.nullable) = false, (gogoproto.customname) = "LastBlockID"];
google.protobuf.Timestamp last_block_time = 6 [(gogoproto.nullable) = false, (gogoproto.stdtime) = true];

uint64 sl_state_index = 7 [(gogoproto.customname) = "SLStateIndex"];

tendermint.types.ValidatorSet next_validators = 8;
tendermint.types.ValidatorSet validators = 9;
tendermint.types.ValidatorSet last_validators = 10;
Expand Down
Loading